Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 39c5068ee -> 1fe1e579b


ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fe1e579
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fe1e579
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fe1e579

Branch: refs/heads/ignite-1232
Commit: 1fe1e579b2fd3f4d2122eae71f98babcc64412bb
Parents: 39c5068
Author: sboikov <[email protected]>
Authored: Tue Jul 12 12:28:58 2016 +0300
Committer: sboikov <[email protected]>
Committed: Tue Jul 12 17:52:17 2016 +0300

----------------------------------------------------------------------
 .../jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java  |   2 +-
 .../junits/common/GridCommonAbstractTest.java   |  17 +
 .../processors/query/h2/IgniteH2Indexing.java   |   5 +
 .../query/h2/opt/GridH2IndexBase.java           |   4 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   8 +-
 .../processors/query/h2/opt/GridH2Table.java    |   4 +-
 .../query/h2/opt/GridH2TreeIndex.java           |   3 +
 .../cache/GridCacheCrossCacheQuerySelfTest.java |   3 +
 ...acheDistributedJoinCollocatedAndNotTest.java |  11 +-
 .../IgniteCacheDistributedJoinNoIndexTest.java  | 262 ++++++++++++++++
 ...ributedJoinPartitionedAndReplicatedTest.java |   6 +-
 ...CacheDistributedJoinQueryConditionsTest.java |  18 +-
 .../cache/IgniteCacheJoinNoIndexTest.java       | 248 ---------------
 ...IgniteCacheJoinQueryWithAffinityKeyTest.java |   4 +
 .../query/IgniteSqlSplitterSelfTest.java        | 307 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   6 +-
 16 files changed, 629 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
index 9392cc1..37352e1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java
@@ -51,7 +51,7 @@ public class TcpDiscoveryJdbcIpFinderSelfTest extends
         dataSrc.setDriverClass("org.h2.Driver");
 
         if (initSchema)
-            dataSrc.setJdbcUrl("jdbc:h2:mem");
+            dataSrc.setJdbcUrl("jdbc:h2:mem:./test");
         else {
             
dataSrc.setJdbcUrl("jdbc:h2:mem:jdbc_ipfinder_not_initialized_schema");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index e73c470..f41f4c5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -1193,4 +1194,20 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
                 return next;
         }
     }
+
+    /**
+     * @param cache Cache.
+     * @param qry Query.
+     * @return Query plan.
+     */
+    protected final String queryPlan(IgniteCache<?, ?> cache, SqlFieldsQuery 
qry) {
+        return (String)cache.query(new SqlFieldsQuery("explain " + 
qry.getSql())
+            .setArgs(qry.getArgs())
+            .setLocal(qry.isLocal())
+            .setCollocated(qry.isCollocated())
+            .setPageSize(qry.getPageSize())
+            .setDistributedJoins(qry.isDistributedJoins())
+            .setEnforceJoinOrder(qry.isEnforceJoinOrder()))
+            .getAll().get(0).get(0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 8dcccb6..5d25dd4 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2738,6 +2738,11 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         }
 
         /** {@inheritDoc} */
+        @Override public CacheConfiguration configuration() {
+            return schema.ccfg;
+        }
+
+        /** {@inheritDoc} */
         @Override public GridUnsafeGuard guard() {
             return guard;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 97eb162..9ee417b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -214,10 +214,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         if (qctx != null) {
             if (!tbl.isPartitioned()) {
-                if (qctx.queryType() == REPLICATED || qctx.queryType() == 
LOCAL)
+                if (tblFilter == null || qctx.queryType() == REPLICATED || 
qctx.queryType() == LOCAL)
                     return null;
 
-                if (tblFilter == null || tblFilter != 
tblFilter.getSelect().getTopFilters().get(0))
+                if (tblFilter != tblFilter.getSelect().getTopFilters().get(0))
                     return null;
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index aaa2848..f519c30 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -51,7 +52,12 @@ public interface GridH2RowDescriptor extends 
GridOffHeapSmartPointerFactory<Grid
      *
      * @return Cache context.
      */
-    public GridCacheContext<?,?> context();
+    public GridCacheContext<?, ?> context();
+
+    /**
+     * @return Cache configuration.
+     */
+    public CacheConfiguration configuration();
 
     /**
      * Creates new row.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 012fb0b..d240c40 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.h2.api.TableEngine;
 import org.h2.command.ddl.CreateTableData;
@@ -58,6 +57,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
@@ -156,7 +156,7 @@ public class GridH2Table extends TableBase {
      * @return {@code true} If this is a partitioned table.
      */
     public boolean isPartitioned() {
-        return desc != null && desc.context().isPartitioned();
+        return desc != null && desc.configuration().getCacheMode() == 
PARTITIONED;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 28ca4bd..6ac6645 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -1253,6 +1253,9 @@ public class GridH2TreeIndex extends GridH2IndexBase 
implements Comparator<GridS
         /** {@inheritDoc} */
         @SuppressWarnings("ForLoopReplaceableByForEach")
         @Override public boolean addSearchRows(SearchRow firstRow, SearchRow 
lastRow) {
+            if (firstRow == null && lastRow == null)
+                throw new CacheException("Failed to executed distributed join, 
index is not used for join condition.");
+
             if (findCalled) {
                 findCalled = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 1f10593..eb84fd3 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -131,6 +131,7 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
 
         SqlFieldsQuery qry = new SqlFieldsQuery("select f.productId, p.name, 
f.price " +
             "from FactPurchase f, \"replicated\".DimProduct p where p.id = 
f.productId ");
+        qry.setEnforceJoinOrder(true);
 
         for (List<?> o : qryProc.queryTwoStep(cache.context(), qry).getAll()) {
             X.println("___ -> " + o);
@@ -164,6 +165,7 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
             "from FactPurchase f, \"replicated\".DimProduct p " +
             "where p.id = f.productId " +
             "group by f.productId, p.name");
+        qry.setEnforceJoinOrder(true);
 
         for (List<?> o : qryProc.queryTwoStep(cache.context(), qry).getAll()) {
             X.println("___ -> " + o);
@@ -181,6 +183,7 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
             "where p.id = f.productId " +
             "group by f.productId, p.name " +
             "having s >= 15");
+        qry.setEnforceJoinOrder(true);
 
         for (List<?> o : qryProc.queryTwoStep(cache.context(), qry).getAll()) {
             X.println("___ -> " + o);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
index 705a898..9d3d329 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -93,6 +94,7 @@ public class IgniteCacheDistributedJoinCollocatedAndNotTest 
extends GridCommonAb
             entity.setKeyType(Integer.class.getName());
             entity.setValueType(Organization.class.getName());
             entity.addQueryField("name", String.class.getName(), null);
+            entity.setIndexes(F.asList(new QueryIndex("name")));
 
             ccfg.setQueryEntities(F.asList(entity));
 
@@ -107,6 +109,7 @@ public class IgniteCacheDistributedJoinCollocatedAndNotTest 
extends GridCommonAb
             entity.setValueType(Account.class.getName());
             entity.addQueryField("personId", Integer.class.getName(), null);
             entity.addQueryField("name", String.class.getName(), null);
+            entity.setIndexes(F.asList(new QueryIndex("personId"), new 
QueryIndex("name")));
 
             ccfg.setQueryEntities(F.asList(entity));
 
@@ -195,14 +198,6 @@ public class 
IgniteCacheDistributedJoinCollocatedAndNotTest extends GridCommonAb
 
         checkQuery(qry, orgCache, false, 2);
 
-        qry = "select o.name, p._key, p.name " +
-            "from \"org\".Organization o, \"person\".Person p " +
-            "where p.affKey != o._key";
-
-        assertTrue(plan(qry, orgCache, false).contains("batched"));
-
-        checkQuery(qry, orgCache, false, 0);
-
         checkQuery("select o.name, p._key, p.name, a.name " +
             "from \"org\".Organization o, \"person\".Person p, \"acc\".Account 
a " +
             "where p.affKey = o._key and p.id = a.personId", orgCache, true, 
2);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java
new file mode 100644
index 0000000..51f47aa
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinNoIndexTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = configuration(PERSON_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Person.class.getName());
+            entity.addQueryField("orgId", Integer.class.getName(), null);
+            entity.addQueryField("orgName", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new 
CacheConfiguration[ccfgs.size()]));
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(0);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        Ignite client = grid(2);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        final IgniteCache<Object, Object> personCache = 
client.cache(PERSON_CACHE);
+        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+
+        AtomicInteger pKey = new AtomicInteger(100_000);
+        AtomicInteger orgKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        for (int i = 0; i < 3; i++) {
+            int orgId = keyForNode(aff, orgKey, node0);
+
+            orgCache.put(orgId, new Organization("org-" + i));
+
+            for (int j = 0; j < i; j++)
+                personCache.put(keyForNode(aff, pKey, node1), new 
Person(orgId, "org-" + i));
+        }
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, 
p._key, p.orgName " +
+                    "from \"org\".Organization o, \"person\".Person p " +
+                    "where p.orgName = o.name");
+
+                qry.setDistributedJoins(true);
+
+                personCache.query(qry).getAll();
+
+                return null;
+            }
+        }, CacheException.class, null);
+    }
+    
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expSize Expected results size.
+     * @param args Arguments.
+     */
+    private void checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expSize,
+        Object... args) {
+        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+
+        log.info("Plan: " + plan);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+        qry.setArgs(args);
+
+        QueryCursor<List<?>> cur = cache.query(qry);
+
+        List<List<?>> res = cur.getAll();
+
+        if (expSize != res.size())
+            log.info("Results: " + res);
+
+        assertEquals(expSize, res.size());
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int orgId;
+
+        /** */
+        String orgName;
+
+        /**
+         * @param orgId Organization ID.
+         * @param orgName Organization name.
+         */
+        public Person(int orgId, String orgName) {
+            this.orgId = orgId;
+            this.orgName = orgName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Organization.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
index 88f0d21..41014ea 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
@@ -177,8 +177,6 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
      */
     public void testJoin() throws Exception {
         join(true);
-
-        join(false);
     }
 
     /**
@@ -229,7 +227,7 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
 
             checkQuery("select o.name, p._key, p.name, a.name " +
                 "from \"org\".Organization o, \"acc\".Account a, 
\"person\".Person p " +
-                "where p.orgId = o._key and p._key = a.personId", orgCache, 
true, 2);
+                "where p.orgId = o._key and p._key = a.personId", orgCache, 
false, 2);
 
             checkQuery("select o.name, p._key, p.name, a.name " +
                 "from \"person\".Person p, \"org\".Organization o, 
\"acc\".Account a " +
@@ -256,8 +254,6 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
                             append(cache3).append(" ").
                             append("where p.orgId = o._key and p._key = 
a.personId");
 
-                        checkQuery(qry.toString(), orgCache, true, 2);
-
                         checkQuery(qry.toString(), orgCache, false, 2);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
index 98630e9..1124067 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
@@ -96,8 +96,6 @@ public class IgniteCacheDistributedJoinQueryConditionsTest 
extends GridCommonAbs
      */
     public void testJoinQuery1() throws Exception {
         joinQuery1(true);
-
-        joinQuery1(false);
     }
 
     /**
@@ -245,11 +243,11 @@ public class 
IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbs
     /**
      * @throws Exception If failed.
      */
-    public void testJoinQuery3() throws Exception {
+    public void _testJoinQuery3() throws Exception {
         Ignite client = grid(2);
 
         try {
-            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
false)));
+            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
true)));
             CacheConfiguration ccfg2 = 
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
 
             IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
@@ -307,7 +305,7 @@ public class IgniteCacheDistributedJoinQueryConditionsTest 
extends GridCommonAbs
 
         try {
             CacheConfiguration ccfg1 =
-                
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
false)));
+                
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(true, 
false)));
 
             IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
 
@@ -342,11 +340,11 @@ public class 
IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbs
 
             checkQuery("select p1._key, p1.name, p2._key, p2.name " +
                 "from Person p1, Person p2 " +
-                "where p1.name != p2.name", pCache, 6);
-
-            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
-                "from Person p1, Person p2 " +
                 "where p1.name > p2.name", pCache, 3);
+
+//            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+//                "from Person p1, Person p2 " +
+//                "where p1.name != p2.name", pCache, 6);
         }
         finally {
             client.destroyCache(PERSON_CACHE);
@@ -361,7 +359,7 @@ public class IgniteCacheDistributedJoinQueryConditionsTest 
extends GridCommonAbs
         Ignite client = grid(2);
 
         try {
-            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
false)));
+            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
true)));
             CacheConfiguration ccfg2 = 
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
 
             IgniteCache<Object, Object> pCache = client.createCache(ccfg1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java
deleted file mode 100644
index 6a97033..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.QueryEntity;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class IgniteCacheJoinNoIndexTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final String PERSON_CACHE = "person";
-
-    /** */
-    private static final String ORG_CACHE = "org";
-
-    /** */
-    private boolean client;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
-
-        spi.setIpFinder(IP_FINDER);
-
-        List<CacheConfiguration> ccfgs = new ArrayList<>();
-
-        {
-            CacheConfiguration ccfg = configuration(PERSON_CACHE);
-
-            QueryEntity entity = new QueryEntity();
-            entity.setKeyType(Integer.class.getName());
-            entity.setValueType(Person.class.getName());
-            entity.addQueryField("orgId", Integer.class.getName(), null);
-            entity.addQueryField("orgName", String.class.getName(), null);
-
-            ccfg.setQueryEntities(F.asList(entity));
-
-            ccfgs.add(ccfg);
-        }
-
-        {
-            CacheConfiguration ccfg = configuration(ORG_CACHE);
-
-            QueryEntity entity = new QueryEntity();
-            entity.setKeyType(Integer.class.getName());
-            entity.setValueType(Organization.class.getName());
-            entity.addQueryField("name", String.class.getName(), null);
-
-            ccfg.setQueryEntities(F.asList(entity));
-
-            ccfgs.add(ccfg);
-        }
-
-        cfg.setCacheConfiguration(ccfgs.toArray(new 
CacheConfiguration[ccfgs.size()]));
-
-        cfg.setClientMode(client);
-
-        return cfg;
-    }
-
-    /**
-     * @param name Cache name.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration configuration(String name) {
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setName(name);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setAtomicWriteOrderMode(PRIMARY);
-        ccfg.setAtomicityMode(ATOMIC);
-        ccfg.setBackups(0);
-
-        return ccfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(2);
-
-        client = true;
-
-        startGrid(2);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testJoin() throws Exception {
-        Ignite client = grid(2);
-
-        Affinity<Object> aff = client.affinity(PERSON_CACHE);
-
-        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
-        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
-
-        AtomicInteger pKey = new AtomicInteger(100_000);
-        AtomicInteger orgKey = new AtomicInteger();
-
-        ClusterNode node0 = ignite(0).cluster().localNode();
-        ClusterNode node1 = ignite(1).cluster().localNode();
-
-        for (int i = 0; i < 3; i++) {
-            int orgId = keyForNode(aff, orgKey, node0);
-
-            orgCache.put(orgId, new Organization("org-" + i));
-
-            for (int j = 0; j < i; j++)
-                personCache.put(keyForNode(aff, pKey, node1), new 
Person(orgId, "org-" + i));
-        }
-
-        checkQuery("select o.name, p._key, p.orgName " +
-            "from \"org\".Organization o, \"person\".Person p " +
-            "where p.orgName = o.name", personCache, false, 3);
-    }
-    
-    /**
-     * @param sql SQL.
-     * @param cache Cache.
-     * @param enforceJoinOrder Enforce join order flag.
-     * @param expSize Expected results size.
-     * @param args Arguments.
-     */
-    private void checkQuery(String sql,
-        IgniteCache<Object, Object> cache,
-        boolean enforceJoinOrder,
-        int expSize,
-        Object... args) {
-        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
-            .setDistributedJoins(true)
-            .setEnforceJoinOrder(enforceJoinOrder))
-            .getAll().get(0).get(0);
-
-        log.info("Plan: " + plan);
-
-        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
-
-        qry.setDistributedJoins(true);
-        qry.setEnforceJoinOrder(enforceJoinOrder);
-        qry.setArgs(args);
-
-        QueryCursor<List<?>> cur = cache.query(qry);
-
-        List<List<?>> res = cur.getAll();
-
-        if (expSize != res.size())
-            log.info("Results: " + res);
-
-        assertEquals(expSize, res.size());
-    }
-
-    /**
-     *
-     */
-    private static class Person implements Serializable {
-        /** */
-        int orgId;
-
-        /** */
-        String orgName;
-
-        /**
-         * @param orgId Organization ID.
-         * @param orgName Organization name.
-         */
-        public Person(int orgId, String orgName) {
-            this.orgId = orgId;
-            this.orgName = orgName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Person.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class Organization implements Serializable {
-        /** */
-        String name;
-
-        /**
-         * @param name Name.
-         */
-        public Organization(String name) {
-            this.name = name;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Organization.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
index dfc450c..d27fe1b 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -296,6 +297,7 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest 
extends GridCommonAbstractT
         account.setValueType(affKey ? AccountKeyWithAffinity.class.getName() : 
Account.class.getName());
         account.addQueryField("personKey", personKeyType, null);
         account.addQueryField("personId", Integer.class.getName(), null);
+        account.setIndexes(F.asList(new QueryIndex("personKey"), new 
QueryIndex("personId")));
 
         QueryEntity person = new QueryEntity();
         person.setKeyType(personKeyType);
@@ -303,6 +305,7 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest 
extends GridCommonAbstractT
         person.addQueryField("orgId", Integer.class.getName(), null);
         person.addQueryField("id", Integer.class.getName(), null);
         person.addQueryField("name", String.class.getName(), null);
+        person.setIndexes(F.asList(new QueryIndex("orgId"), new 
QueryIndex("id"), new QueryIndex("name")));
 
         if (affKey && includeAffKey)
             person.addQueryField("affKey", Integer.class.getName(), null);
@@ -311,6 +314,7 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest 
extends GridCommonAbstractT
         org.setKeyType(Integer.class.getName());
         org.setValueType(Organization.class.getName());
         org.addQueryField("name", String.class.getName(), null);
+        org.setIndexes(F.asList(new QueryIndex("name")));
 
         ccfg.setQueryEntities(F.asList(account, person, org));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index fe209d6..0a7ea8b 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -371,6 +372,270 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinsPlan() throws Exception {
+        List<IgniteCache<Object, Object>> caches = new ArrayList<>();
+
+        IgniteCache<Object, Object> persPart =
+            ignite(0).createCache(cacheConfig("persPart", true, Integer.class, 
Person2.class));
+        caches.add(persPart);
+
+        IgniteCache<Object, Object> persPartAff =
+            ignite(0).createCache(cacheConfig("persPartAff", true, 
TestKey.class, Person2.class));
+        caches.add(persPartAff);
+
+        IgniteCache<Object, Object> orgPart =
+            ignite(0).createCache(cacheConfig("orgPart", true, Integer.class, 
Organization.class));
+        caches.add(orgPart);
+
+        IgniteCache<Object, Object> orgPartAff =
+            ignite(0).createCache(cacheConfig("orgPartAff", true, 
TestKey.class, Organization.class));
+        caches.add(orgPartAff);
+
+        IgniteCache<Object, Object> orgRepl =
+            ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, 
Organization.class));
+        caches.add(orgRepl);
+
+        try {
+            // Join two partitioned.
+
+            checkQueryPlanContains(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgPart\".Organization o " 
+
+                    "where p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlanContains(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgPartAff\".Organization 
o " +
+                    "where p.orgId = o.affKey",
+                "batched:unicast");
+
+            checkQueryPlanContains(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgPart\".Organization o " 
+
+                    "where p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlanContains(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p inner join 
\"orgPart\".Organization o " +
+                    "on p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlanContains(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p left outer join 
\"orgPart\".Organization o " +
+                    "on p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlanContains(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgPart\".Organization o, \"persPart\".Person2 p " 
+
+                    "where p.orgId = o._key",
+                "batched:broadcast");
+
+            checkQueryPlanContains(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgPartAff\".Organization o, \"persPart\".Person2 
p " +
+                    "where p.orgId = o.affKey",
+                "batched:broadcast");
+
+            // Join partitioned and replicated.
+
+            checkQueryPlanContains(persPart,
+                true,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgRepl\".Organization o " 
+
+                    "where p.orgId = o._key");
+
+            checkQueryPlanContains(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgRepl\".Organization o " 
+
+                    "where p.orgId = o._key");
+
+            checkQueryPlanContains(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p inner join 
\"orgRepl\".Organization o " +
+                    "on p.orgId = o._key");
+
+            checkQueryPlanContains(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p left outer join 
\"orgRepl\".Organization o " +
+                    "on p.orgId = o._key");
+
+            checkQueryPlanContains(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgRepl\".Organization o, \"persPart\".Person2 p " 
+
+                    "where p.orgId = o._key",
+                "batched:broadcast");
+
+            checkQueryPlanContains(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgRepl\".Organization o inner join 
\"persPart\".Person2 p " +
+                    "on p.orgId = o._key",
+                "batched:broadcast");
+
+            checkQueryPlanContains(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgRepl\".Organization o left outer join 
\"persPart\".Person2 p " +
+                    "on p.orgId = o._key",
+                "batched:broadcast");
+
+            // Join on affinity keys.
+
+            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+                "\"persPart\".Person2 p",
+                "\"orgPart\".Organization o",
+                "where p._key = o._key");
+
+            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+                "\"persPart\".Person2 p",
+                "\"orgRepl\".Organization o",
+                "where p._key = o._key");
+
+            checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ",
+                "\"persPartAff\".Person2 p",
+                "\"orgPart\".Organization o",
+                "where p.affKey = o._key");
+
+            checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ",
+                "\"persPartAff\".Person2 p",
+                "\"orgRepl\".Organization o",
+                "where p.affKey = o._key");
+        }
+        finally {
+            for (IgniteCache<Object, Object> cache : caches)
+                ignite(0).destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param cache Query cache.
+     * @param select Select clause.
+     * @param cache1 Cache name1.
+     * @param cache2 Cache name2.
+     * @param where Where clause.
+     */
+    private void checkNoBatchedJoin(IgniteCache<Object, Object> cache,
+        String select,
+        String cache1,
+        String cache2,
+        String where) {
+        checkQueryPlanContains(cache,
+            true,
+            0,
+            select +
+                "from " + cache1 + ","  + cache2 + " "+ where);
+        checkQueryPlanContains(cache,
+            false,
+            0,
+            select +
+                "from " + cache1 + ","  + cache2 + " "+ where);
+        checkQueryPlanContains(cache,
+            true,
+            0,
+            select +
+                "from " + cache2 + ","  + cache1 + " "+ where);
+        checkQueryPlanContains(cache,
+            false,
+            0,
+            select +
+                "from " + cache2 + ","  + cache1 + " "+ where);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expBatchedJoins Expected batched joins count.
+     * @param sql Query.
+     * @param expText Expected text to find in plan.
+     */
+    private void checkQueryPlanContains(IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expBatchedJoins,
+        String sql,
+        String...expText) {
+        checkQueryPlanContains(cache,
+            enforceJoinOrder,
+            expBatchedJoins,
+            new SqlFieldsQuery(sql),
+            expText);
+
+        checkQueryPlanContains(cache,
+            enforceJoinOrder,
+            expBatchedJoins,
+            new SqlFieldsQuery("select * from (" + sql + ")"),
+            expText);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expBatchedJoins Expected batched joins count.
+     * @param qry Query.
+     * @param expText Expected text to find in plan.
+     */
+    private void checkQueryPlanContains(IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expBatchedJoins,
+        SqlFieldsQuery qry,
+        String...expText) {
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+        qry.setDistributedJoins(true);
+
+        String plan = queryPlan(cache, qry);
+
+        log.info("Plan: " + plan);
+
+        assertEquals("Unexpected number of batched joins in plan [plan=" + 
plan + ", qry=" + qry + ']',
+            expBatchedJoins,
+            StringUtils.countOccurrencesOf(plan, "batched"));
+
+        int startIdx = 0;
+
+        for (String exp : expText) {
+            int idx = exp.indexOf(exp, startIdx);
+
+            if (idx == -1) {
+                fail("Plan does not contain expected string [startIdx=" + 
startIdx +
+                    ", plan=" + plan +
+                    ", exp=" + exp + ']');
+            }
+
+            startIdx = idx + 1;
+        }
+    }
+
+    /**
      * Test HAVING clause.
      */
     public void testHaving() {
@@ -658,6 +923,48 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
     /**
      *
      */
+    private static class TestKey implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        @AffinityKeyMapped
+        int affKey;
+
+        /** */
+        @QuerySqlField()
+        int id;
+
+        /**
+         * @param affKey Affinity key.
+         * @param id ID.
+         */
+        public TestKey(int affKey, int id) {
+            this.affKey = affKey;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey personKey = (TestKey)o;
+
+            return id == personKey.id;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
     private static class Organization implements Serializable {
         /** */
         @QuerySqlField

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index bd39c49..50bf51c 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -27,10 +27,11 @@ import 
org.apache.ignite.internal.processors.cache.IgniteBinaryWrappedObjectFiel
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySelfTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheCrossCacheJoinRandomTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCollocatedAndNotTest;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCustomAffinityMapper;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinPartitionedAndReplicatedTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinQueryConditionsTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheJoinNoIndexTest;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinNoIndexTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheJoinPartitionedAndReplicatedTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheJoinQueryWithAffinityKeyTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheLargeResultSelfTest;
@@ -129,7 +130,8 @@ public class IgniteCacheQuerySelfTestSuite extends 
TestSuite {
         
suite.addTestSuite(IgniteCacheDistributedJoinPartitionedAndReplicatedTest.class);
         
suite.addTestSuite(IgniteCacheDistributedJoinQueryConditionsTest.class);
         suite.addTestSuite(IgniteCacheJoinPartitionedAndReplicatedTest.class);
-        suite.addTestSuite(IgniteCacheJoinNoIndexTest.class);
+        suite.addTestSuite(IgniteCacheDistributedJoinNoIndexTest.class);
+        
suite.addTestSuite(IgniteCacheDistributedJoinCustomAffinityMapper.class);
         suite.addTestSuite(IgniteCrossCachesJoinsQueryTest.class);
         suite.addTestSuite(IgniteCacheCrossCacheJoinRandomTest.class);
 

Reply via email to