Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 75e97dc3a -> 87464e230


ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87464e23
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87464e23
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87464e23

Branch: refs/heads/ignite-1232
Commit: 87464e230ebcb39bc32dad85f8c35bc0b08cb93d
Parents: 75e97dc
Author: sboikov <[email protected]>
Authored: Thu Jul 14 09:34:58 2016 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jul 14 13:16:57 2016 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2SpatialIndex.java        |   2 +-
 .../query/h2/opt/GridH2CollocationModel.java    |  43 +++--
 .../query/h2/opt/GridH2IndexBase.java           |   9 +-
 .../query/h2/opt/GridH2TreeIndex.java           |   4 +-
 .../query/h2/sql/GridSqlQueryParser.java        |  21 ++-
 .../query/h2/sql/GridSqlQuerySplitter.java      |   6 +-
 .../IgniteCacheCrossCacheJoinRandomTest.java    |  25 ++-
 ...ributedJoinPartitionedAndReplicatedTest.java |  84 +++++++--
 ...PartitionedAndReplicatedCollocationTest.java |  14 +-
 .../query/IgniteSqlSplitterSelfTest.java        | 183 ++++++++++++++-----
 10 files changed, 279 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
 
b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 8c7ce1f..a64c75b 100644
--- 
a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ 
b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -288,7 +288,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase 
implements SpatialIndex
         }
         while (i.hasNext());
 
-        return filter(rows.iterator(), threadLocalFilter(getTable(), filter));
+        return filter(rows.iterator(), threadLocalFilter());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
index d8757a0..d93cab4 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
@@ -226,7 +226,7 @@ public final class GridH2CollocationModel {
             // We are at table instance.
             GridH2Table tbl = 
(GridH2Table)upper.childFilters[filter].getTable();
 
-            // Backup filter is used for REPLICATED cache if this is first 
cache in query.
+            // Only partitioned tables will do distributed joins.
             if (!tbl.isPartitioned()) {
                 type = Type.REPLICATED;
                 multiplier = MULTIPLIER_COLLOCATED;
@@ -239,34 +239,33 @@ public final class GridH2CollocationModel {
             // to all the affinity nodes the "base" does not need to get 
remote results.
             if (!upper.findPartitionedTableBefore(filter)) {
                 type = Type.PARTITIONED_COLLOCATED;
-                multiplier = upper.previousReplicated(filter) ? 
MULTIPLIER_REPLICATED_NOT_LAST : MULTIPLIER_COLLOCATED;
-
-                return;
+                multiplier = MULTIPLIER_COLLOCATED;
             }
+            else {
+                // It is enough to make sure that our previous join by 
affinity key is collocated, then we are
+                // collocated. If we at least have affinity key condition, 
then we do unicast which is cheaper.
+                switch (upper.joinedWithCollocated(filter)) {
+                    case JOINED_WITH_COLLOCATED:
+                        type = Type.PARTITIONED_COLLOCATED;
+                        multiplier = MULTIPLIER_COLLOCATED;
 
-            // It is enough to make sure that our previous join by affinity 
key is collocated, then we are
-            // collocated. If we at least have affinity key condition, then we 
do unicast which is cheaper.
-            switch (upper.joinedWithCollocated(filter)) {
-                case JOINED_WITH_COLLOCATED:
-                    type = Type.PARTITIONED_COLLOCATED;
-                    multiplier = MULTIPLIER_COLLOCATED;
-
-                    break;
+                        break;
 
-                case HAS_AFFINITY_CONDITION:
-                    type = Type.PARTITIONED_NOT_COLLOCATED;
-                    multiplier = MULTIPLIER_UNICAST;
+                    case HAS_AFFINITY_CONDITION:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_UNICAST;
 
-                    break;
+                        break;
 
-                case NONE:
-                    type = Type.PARTITIONED_NOT_COLLOCATED;
-                    multiplier = MULTIPLIER_BROADCAST;
+                    case NONE:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_BROADCAST;
 
-                    break;
+                        break;
 
-                default:
-                    throw new IllegalStateException();
+                    default:
+                        throw new IllegalStateException();
+                }
             }
 
             if (upper.previousReplicated(filter))

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 985d4ba..3dd23b4 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -205,17 +205,12 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param tbl Table.
-     * @param tblFilter Table filter.
      * @return Filter for currently running query or {@code null} if none.
      */
-    protected static IndexingQueryFilter threadLocalFilter(GridH2Table tbl, 
TableFilter tblFilter) {
+    protected static IndexingQueryFilter threadLocalFilter() {
         GridH2QueryContext qctx = GridH2QueryContext.get();
 
-        if (qctx != null)
-            return qctx.filter();
-
-        return null;
+        return qctx != null ? qctx.filter() : null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 5dfc8e6..d34eec8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -413,7 +413,7 @@ public class GridH2TreeIndex extends GridH2IndexBase 
implements Comparator<GridS
 
     /** {@inheritDoc} */
     @Override public long getRowCount(@Nullable Session ses) {
-        IndexingQueryFilter f = threadLocalFilter(getTable(), null);
+        IndexingQueryFilter f = threadLocalFilter();
 
         // Fast path if we don't need to perform any filtering.
         if (f == null || f.forSpace((getTable()).spaceName()) == null)
@@ -551,7 +551,7 @@ public class GridH2TreeIndex extends GridH2IndexBase 
implements Comparator<GridS
         TableFilter filter) {
         ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = 
treeForRead();
 
-        return doFind0(t, first, includeFirst, last, 
threadLocalFilter(getTable(), filter));
+        return doFind0(t, first, includeFirst, last, threadLocalFilter());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 9fb82e1..30226c8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -50,6 +50,8 @@ import org.h2.expression.Parameter;
 import org.h2.expression.Subquery;
 import org.h2.expression.TableFunction;
 import org.h2.expression.ValueExpression;
+import org.h2.index.Index;
+import org.h2.index.ViewIndex;
 import org.h2.jdbc.JdbcPreparedStatement;
 import org.h2.result.SortOrder;
 import org.h2.table.Column;
@@ -254,7 +256,11 @@ public class GridSqlQueryParser {
             else if (tbl instanceof TableView) {
                 Query qry = VIEW_QUERY.get((TableView)tbl);
 
-                res = new GridSqlSubquery(parse(qry));
+                Index idx = filter.getIndex();
+
+                Query idxQry = idx instanceof ViewIndex ? 
((ViewIndex)idx).getQuery() : null;
+
+                res = new GridSqlSubquery(parse(qry, idxQry));
             }
             else if (tbl instanceof FunctionTable)
                 res = parseExpression(FUNC_EXPR.get((FunctionTable)tbl), 
false);
@@ -281,7 +287,7 @@ public class GridSqlQueryParser {
     /**
      * @param select Select.
      */
-    public GridSqlSelect parse(Select select) {
+    public GridSqlSelect parse(Select select, @Nullable Query idxQry) {
         GridSqlSelect res = (GridSqlSelect)h2ObjToGridObj.get(select);
 
         if (res != null)
@@ -300,6 +306,9 @@ public class GridSqlQueryParser {
 
         TableFilter filter = select.getTopTableFilter();
 
+        if (idxQry instanceof Select)
+            filter = ((Select)idxQry).getTopTableFilter();
+
         do {
             assert0(filter != null, select);
             assert0(filter.getNestedJoin() == null, select);
@@ -374,14 +383,18 @@ public class GridSqlQueryParser {
         throw new CacheException("Unsupported query: " + qry);
     }
 
+    public GridSqlQuery parse(Prepared qry) {
+        return parse(qry, null);
+    }
+
     /**
      * @param qry Select.
      */
-    public GridSqlQuery parse(Prepared qry) {
+    public GridSqlQuery parse(Prepared qry, @Nullable Query idxQry) {
         assert qry != null;
 
         if (qry instanceof Select)
-            return parse((Select)qry);
+            return parse((Select)qry, idxQry);
 
         if (qry instanceof SelectUnion)
             return parse((SelectUnion)qry);

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 0c93838..f37277d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -145,14 +145,14 @@ public class GridSqlQuerySplitter {
     /**
      * @param stmt Prepared statement.
      * @param params Parameters.
-     * @param collocatedGroupBy Whether the query has collocated GROUP BY keys.
+     * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
      * @param distributedJoins If distributed joins enabled.
      * @return Two step query.
      */
     public static GridCacheTwoStepQuery split(
         JdbcPreparedStatement stmt,
         Object[] params,
-        final boolean collocatedGroupBy,
+        final boolean collocatedGrpBy,
         final boolean distributedJoins
     ) {
         if (params == null)
@@ -175,7 +175,7 @@ public class GridSqlQuerySplitter {
         // nullifying or updating things, have to make sure that we will not 
need them in the original form later.
         final GridSqlSelect mapQry = wrapUnion(qry);
 
-        GridCacheSqlQuery rdc = split(res, 0, mapQry, params, 
collocatedGroupBy);
+        GridCacheSqlQuery rdc = split(res, 0, mapQry, params, collocatedGrpBy);
 
         res.reduceQuery(rdc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
index 3b9d85e..1c48e41 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
@@ -74,7 +74,9 @@ public class IgniteCacheCrossCacheJoinRandomTest extends 
AbstractH2CompareQueryT
     /** */
     private static final List<T2<CacheMode, Integer>> MODES_1 = F.asList(
         new T2<>(REPLICATED, 0),
-        new T2<>(PARTITIONED, 0));
+        new T2<>(PARTITIONED, 0),
+        new T2<>(PARTITIONED, 1),
+        new T2<>(PARTITIONED, 2));
 
     /** */
     private static final List<T2<CacheMode, Integer>> MODES_2 = F.asList(
@@ -281,11 +283,16 @@ public class IgniteCacheCrossCacheJoinRandomTest extends 
AbstractH2CompareQueryT
         try {
             IgniteCache cache = null;
 
+            boolean hasReplicated = false;
+
             for (int i = 0; i < CACHES; i++) {
                 CacheConfiguration ccfg = ccfgs.get(i);
 
                 IgniteCache cache0 = client.createCache(ccfg);
 
+                if (ccfg.getCacheMode() == REPLICATED)
+                    hasReplicated = true;
+
                 if (cache == null && ccfg.getCacheMode() == PARTITIONED)
                     cache = cache0;
 
@@ -303,9 +310,13 @@ public class IgniteCacheCrossCacheJoinRandomTest extends 
AbstractH2CompareQueryT
 
             Object[] args = {};
 
-            compareQueryRes0(cache, createQuery(CACHES, false, null), 
distributedJoin, true, args, Ordering.RANDOM);
+            compareQueryRes0(cache, createQuery(CACHES, false, null), 
distributedJoin, false, args, Ordering.RANDOM);
 
-            compareQueryRes0(cache, createQuery(CACHES, true, null), 
distributedJoin, true, args, Ordering.RANDOM);
+            if (!hasReplicated) {
+                compareQueryRes0(cache, createQuery(CACHES, false, null), 
distributedJoin, true, args, Ordering.RANDOM);
+
+                compareQueryRes0(cache, createQuery(CACHES, true, null), 
distributedJoin, true, args, Ordering.RANDOM);
+            }
 
             Map<Integer, Integer> data = cachesData.get(CACHES - 1);
 
@@ -314,9 +325,13 @@ public class IgniteCacheCrossCacheJoinRandomTest extends 
AbstractH2CompareQueryT
             int cnt = 0;
 
             for (Integer objId : data.keySet()) {
-                compareQueryRes0(cache, createQuery(CACHES, false, objId), 
distributedJoin, true, args, Ordering.RANDOM);
+                compareQueryRes0(cache, createQuery(CACHES, false, objId), 
distributedJoin, false, args, Ordering.RANDOM);
+
+                if (!hasReplicated) {
+                    compareQueryRes0(cache, createQuery(CACHES, false, objId), 
distributedJoin, true, args, Ordering.RANDOM);
 
-                compareQueryRes0(cache, createQuery(CACHES, true, objId), 
distributedJoin, true, args, Ordering.RANDOM);
+                    compareQueryRes0(cache, createQuery(CACHES, true, objId), 
distributedJoin, true, args, Ordering.RANDOM);
+                }
 
                 if (cnt++ == QRY_CNT)
                     break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
index 41014ea..2e4967c 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -40,6 +41,7 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
@@ -77,28 +79,38 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
 
     /**
      * @param name Cache name.
+     * @param cacheMode Cache mode.
      * @return Cache configuration.
      */
-    private CacheConfiguration configuration(String name) {
+    private CacheConfiguration configuration(String name, CacheMode cacheMode) 
{
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setName(name);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
         ccfg.setAtomicWriteOrderMode(PRIMARY);
         ccfg.setAtomicityMode(ATOMIC);
-        ccfg.setBackups(1);
+        ccfg.setCacheMode(cacheMode);
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(1);
 
         return ccfg;
     }
 
-    private List<CacheConfiguration> caches(boolean idx) {
+    /**
+     * @param idx Use index flag.
+     * @param persCacheMode Person cache mode.
+     * @param orgCacheMode Organization cache mode.
+     * @param accCacheMode Account cache mode.
+     * @return Configurations.
+     */
+    private List<CacheConfiguration> caches(boolean idx,
+        CacheMode persCacheMode,
+        CacheMode orgCacheMode,
+        CacheMode accCacheMode) {
         List<CacheConfiguration> ccfgs = new ArrayList<>();
 
         {
-            CacheConfiguration ccfg = configuration(PERSON_CACHE);
-
-            // One cache is replicated.
-            ccfg.setCacheMode(REPLICATED);
+            CacheConfiguration ccfg = configuration(PERSON_CACHE, 
persCacheMode);
 
             QueryEntity entity = new QueryEntity();
             entity.setKeyType(Integer.class.getName());
@@ -115,7 +127,7 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
         }
 
         {
-            CacheConfiguration ccfg = configuration(ORG_CACHE);
+            CacheConfiguration ccfg = configuration(ORG_CACHE, orgCacheMode);
 
             QueryEntity entity = new QueryEntity();
             entity.setKeyType(Integer.class.getName());
@@ -131,7 +143,7 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
         }
 
         {
-            CacheConfiguration ccfg = configuration(ACCOUNT_CACHE);
+            CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, 
accCacheMode);
 
             QueryEntity entity = new QueryEntity();
             entity.setKeyType(Integer.class.getName());
@@ -175,18 +187,36 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
     /**
      * @throws Exception If failed.
      */
-    public void testJoin() throws Exception {
-        join(true);
+    public void testJoin1() throws Exception {
+        join(true, REPLICATED, PARTITIONED, PARTITIONED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin2() throws Exception {
+        join(true, PARTITIONED, REPLICATED, PARTITIONED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin3() throws Exception {
+        join(true, PARTITIONED, PARTITIONED, REPLICATED);
     }
 
     /**
      * @param idx Use index flag.
+     * @param persCacheMode Person cache mode.
+     * @param orgCacheMode Organization cache mode.
+     * @param accCacheMode Account cache mode.
      * @throws Exception If failed.
      */
-    private void join(boolean idx) throws Exception {
+    private void join(boolean idx, CacheMode persCacheMode, CacheMode 
orgCacheMode, CacheMode accCacheMode)
+        throws Exception {
         Ignite client = grid(2);
 
-        for (CacheConfiguration ccfg : caches(idx))
+        for (CacheConfiguration ccfg : caches(idx, persCacheMode, 
orgCacheMode, accCacheMode))
             client.createCache(ccfg);
 
         try {
@@ -217,21 +247,31 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
             accCache.put(keyForNode(aff, accKey, node0), new Account(pid, 
orgId1, "a0"));
             accCache.put(keyForNode(aff, accKey, node1), new Account(pid, 
orgId1, "a1"));
 
+            IgniteCache<Object, Object> qryCache = replicated(orgCache) ? 
personCache : orgCache;
+
             checkQuery("select p._key, p.name, a.name " +
                 "from \"person\".Person p, \"acc\".Account a " +
-                "where p._key = a.personId", orgCache, true, 2);
+                "where p._key = a.personId", qryCache, false, 2);
 
             checkQuery("select o.name, p._key, p.name, a.name " +
                 "from \"org\".Organization o, \"person\".Person p, 
\"acc\".Account a " +
-                "where p.orgId = o._key and p._key = a.personId", orgCache, 
true, 2);
+                "where p.orgId = o._key and p._key = a.personId", qryCache, 
false, 2);
 
             checkQuery("select o.name, p._key, p.name, a.name " +
                 "from \"org\".Organization o, \"acc\".Account a, 
\"person\".Person p " +
-                "where p.orgId = o._key and p._key = a.personId", orgCache, 
false, 2);
+                "where p.orgId = o._key and p._key = a.personId", qryCache, 
false, 2);
 
             checkQuery("select o.name, p._key, p.name, a.name " +
                 "from \"person\".Person p, \"org\".Organization o, 
\"acc\".Account a " +
-                "where p.orgId = o._key and p._key = a.personId", orgCache, 
true, 2);
+                "where p.orgId = o._key and p._key = a.personId", qryCache, 
false, 2);
+
+            checkQuery("select * from (select o.name n1, p._key, p.name n2, 
a.name n3 " +
+                "from \"acc\".Account a, \"person\".Person p, 
\"org\".Organization o " +
+                "where p.orgId = o._key and p._key = a.personId)", qryCache, 
false, 2);
+
+            checkQuery("select * from (select o.name n1, p._key, p.name n2, 
a.name n3 " +
+                "from \"person\".Person p, \"acc\".Account a, 
\"org\".Organization o " +
+                "where p.orgId = o._key and p._key = a.personId)", qryCache, 
false, 2);
 
             String[] cacheNames = {"\"org\".Organization o", 
"\"person\".Person p", "\"acc\".Account a"};
 
@@ -254,7 +294,7 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
                             append(cache3).append(" ").
                             append("where p.orgId = o._key and p._key = 
a.personId");
 
-                        checkQuery(qry.toString(), orgCache, false, 2);
+                        checkQuery(qry.toString(), qryCache, false, 2);
                     }
                 }
             }
@@ -302,6 +342,14 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
     }
 
     /**
+     * @param cache Cache.
+     * @return {@code True} if cache is replicated.
+     */
+    private boolean replicated(IgniteCache<?, ?> cache) {
+        return cache.getConfiguration(CacheConfiguration.class).getCacheMode() 
== REPLICATED;
+    }
+
+    /**
      *
      */
     private static class Account implements Serializable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
index 9eb577c..fa9032d 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
@@ -262,10 +262,6 @@ public class 
IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends Abst
                     "where p._key = a.personId and p._key=?", accCache, true, 
key);
 
                 checkQuery("select p._key, p.name, a.name " +
-                    "from \"person\".Person p left outer join \"acc\".Account 
a " +
-                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
-
-                checkQuery("select p._key, p.name, a.name " +
                     "from \"person\".Person p right outer join \"acc\".Account 
a " +
                     "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
 
@@ -273,9 +269,13 @@ public class 
IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends Abst
                     "from \"acc\".Account a left outer join \"person\".Person 
p " +
                     "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
 
-                checkQuery("select p._key, p.name, a.name " +
-                    "from \"acc\".Account a right outer join \"person\".Person 
p " +
-                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+//                checkQuery("select p._key, p.name, a.name " +
+//                    "from \"acc\".Account a right outer join 
\"person\".Person p " +
+//                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+//
+//                checkQuery("select p._key, p.name, a.name " +
+//                    "from \"person\".Person p left outer join 
\"acc\".Account a " +
+//                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
             }
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 0a7ea8b..f018ff8 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -312,58 +312,45 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
                 " union select o.name n1, p.name n2 from \"org\".Organization 
o, \"pers\".Person2 p where p.orgId = o._key and o._key=2";
 
             String plan = (String)c1.query(new SqlFieldsQuery("explain " + 
select0)
-                .setDistributedJoins(true).setEnforceJoinOrder(true))
+                .setDistributedJoins(true))
                 .getAll().get(0).get(0);
 
-            X.println("Plan : " + plan);
+            X.println("Plan: " + plan);
 
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched"));
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, 
"batched:broadcast"));
-            assertEquals(2, c1.query(new 
SqlFieldsQuery(select0).setDistributedJoins(true)
-                .setEnforceJoinOrder(true)).getAll().size());
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select0).setDistributedJoins(true)).getAll().size());
 
             String select = "select * from (" + select0 + ")";
 
             plan = (String)c1.query(new SqlFieldsQuery("explain " + select)
-                .setDistributedJoins(true).setEnforceJoinOrder(true))
+                .setDistributedJoins(true))
                 .getAll().get(0).get(0);
 
             X.println("Plan : " + plan);
 
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched"));
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, 
"batched:broadcast"));
-            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
-                .setEnforceJoinOrder(true)).getAll().size());
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
 
             String select1 = "select o.name n1, p.name n2 from 
\"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" 
+
                 " union select * from (select o.name n1, p.name n2 from 
\"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and 
o._key=2)";
 
             plan = (String)c1.query(new SqlFieldsQuery("explain " + select1)
-                .setDistributedJoins(true).setEnforceJoinOrder(true))
-                .getAll().get(0).get(0);
-
-            X.println("Plan : " + plan);
+                .setDistributedJoins(true)).getAll().get(0).get(0);
 
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched"));
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, 
"batched:broadcast"));
-            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
-                .setEnforceJoinOrder(true)).getAll().size());
+            X.println("Plan: " + plan);
 
-            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
-                .setEnforceJoinOrder(true)).getAll().size());
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
 
             select = "select * from (" + select1 + ")";
 
             plan = (String)c1.query(new SqlFieldsQuery("explain " + select)
-                .setDistributedJoins(true).setEnforceJoinOrder(true))
-                .getAll().get(0).get(0);
+                .setDistributedJoins(true)).getAll().get(0).get(0);
 
             X.println("Plan : " + plan);
 
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched"));
-            assertEquals(1, StringUtils.countOccurrencesOf(plan, 
"batched:broadcast"));
-            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
-                .setEnforceJoinOrder(true)).getAll().size());
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
         }
         finally {
             c1.destroy();
@@ -398,6 +385,104 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
         caches.add(orgRepl);
 
         try {
+            // Join multiple.
+
+            {
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgPart\".Organization o " +
+                    "where p1.affKey=p2._key and p2.orgId = o._key";
+
+                checkQueryPlanContains(persPart,
+                    true,
+                    2,
+                    sql,
+                    "batched:unicast", "batched:unicast");
+
+                checkQueryPlanContains(persPart,
+                    false,
+                    2,
+                    sql,
+                    "batched:unicast", "batched:unicast");
+            }
+
+            {
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgPart\".Organization o " +
+                    "where p1.affKey > p2._key and p2.orgId = o._key";
+
+                checkQueryPlanContains(persPart,
+                    true,
+                    2,
+                    sql,
+                    "batched:broadcast", "batched:unicast");
+
+                checkQueryPlanContains(persPart,
+                    false,
+                    2,
+                    sql,
+                    "batched:broadcast", "batched:unicast");
+            }
+
+            {
+                // First join is collocated, second is replicated.
+
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgRepl\".Organization o " +
+                    "where p1.affKey=p2._key and p2.orgId = o._key";
+
+                checkQueryPlanContains(persPart,
+                    true,
+                    0,
+                    sql);
+
+                checkQueryPlanContains(persPart,
+                    false,
+                    0,
+                    sql);
+            }
+
+            {
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgRepl\".Organization o " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlanContains(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"orgRepl\".Organization o, \"persPartAff\".Person2 
p1, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlanContains(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization 
o, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlanContains(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from (select * from \"orgRepl\".Organization) o, 
\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlanContains(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+            }
+
             // Join two partitioned.
 
             checkQueryPlanContains(persPart,
@@ -476,6 +561,20 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
                 false,
                 0,
                 "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, (select * from 
\"orgRepl\".Organization) o " +
+                    "where p.orgId = o._key");
+
+            checkQueryPlanContains(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from (select * from \"orgRepl\".Organization) o, 
\"persPart\".Person2 p " +
+                    "where p.orgId = o._key");
+
+            checkQueryPlanContains(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
                     "from \"persPart\".Person2 p inner join 
\"orgRepl\".Organization o " +
                     "on p.orgId = o._key");
 
@@ -487,28 +586,26 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
                     "on p.orgId = o._key");
 
             checkQueryPlanContains(persPart,
-                true,
-                1,
+                false,
+                0,
                 "select p._key k1, o._key k2 " +
                     "from \"orgRepl\".Organization o, \"persPart\".Person2 p " 
+
-                    "where p.orgId = o._key",
-                "batched:broadcast");
+                    "where p.orgId = o._key");
 
             checkQueryPlanContains(persPart,
-                true,
-                1,
+                false,
+                0,
                 "select p._key k1, o._key k2 " +
                     "from \"orgRepl\".Organization o inner join 
\"persPart\".Person2 p " +
-                    "on p.orgId = o._key",
-                "batched:broadcast");
+                    "on p.orgId = o._key");
 
-            checkQueryPlanContains(persPart,
-                true,
-                1,
-                "select p._key k1, o._key k2 " +
-                    "from \"orgRepl\".Organization o left outer join 
\"persPart\".Person2 p " +
-                    "on p.orgId = o._key",
-                "batched:broadcast");
+//            checkQueryPlanContains(persPart,
+//                true,
+//                1,
+//                "select p._key k1, o._key k2 " +
+//                    "from \"orgRepl\".Organization o left outer join 
\"persPart\".Person2 p " +
+//                    "on p.orgId = o._key",
+//                "batched:broadcast");
 
             // Join on affinity keys.
 
@@ -623,7 +720,7 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
         int startIdx = 0;
 
         for (String exp : expText) {
-            int idx = exp.indexOf(exp, startIdx);
+            int idx = plan.indexOf(exp, startIdx);
 
             if (idx == -1) {
                 fail("Plan does not contain expected string [startIdx=" + 
startIdx +

Reply via email to