http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
new file mode 100644
index 0000000..d7610dc
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private static final String ACCOUNT_CACHE = "acc";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param cacheMode Cache mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name, CacheMode cacheMode) 
{
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setCacheMode(cacheMode);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    /**
+     * @param idx Use index flag.
+     * @param persCacheMode Person cache mode.
+     * @param orgCacheMode Organization cache mode.
+     * @param accCacheMode Account cache mode.
+     * @return Configurations.
+     */
+    private List<CacheConfiguration> caches(boolean idx,
+        CacheMode persCacheMode,
+        CacheMode orgCacheMode,
+        CacheMode accCacheMode) {
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = configuration(PERSON_CACHE, 
persCacheMode);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Person.class.getName());
+            entity.addQueryField("orgId", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+
+            if (idx)
+                entity.setIndexes(F.asList(new QueryIndex("orgId"), new 
QueryIndex("name")));
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE, orgCacheMode);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+            entity.addQueryField("name", String.class.getName(), null);
+
+            if (idx)
+                entity.setIndexes(F.asList(new QueryIndex("name")));
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, 
accCacheMode);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Account.class.getName());
+            entity.addQueryField("orgId", Integer.class.getName(), null);
+            entity.addQueryField("personId", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+
+            if (idx) {
+                entity.setIndexes(F.asList(new QueryIndex("orgId"),
+                    new QueryIndex("personId"),
+                    new QueryIndex("name")));
+            }
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        return ccfgs;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin1() throws Exception {
+        join(true, REPLICATED, PARTITIONED, PARTITIONED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin2() throws Exception {
+        join(true, PARTITIONED, REPLICATED, PARTITIONED);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin3() throws Exception {
+        join(true, PARTITIONED, PARTITIONED, REPLICATED);
+    }
+
+    /**
+     * @param idx Use index flag.
+     * @param persCacheMode Person cache mode.
+     * @param orgCacheMode Organization cache mode.
+     * @param accCacheMode Account cache mode.
+     * @throws Exception If failed.
+     */
+    private void join(boolean idx, CacheMode persCacheMode, CacheMode 
orgCacheMode, CacheMode accCacheMode)
+        throws Exception {
+        Ignite client = grid(2);
+
+        for (CacheConfiguration ccfg : caches(idx, persCacheMode, 
orgCacheMode, accCacheMode))
+            client.createCache(ccfg);
+
+        try {
+            IgniteCache<Object, Object> personCache = 
client.cache(PERSON_CACHE);
+            IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+            IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE);
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger pKey = new AtomicInteger(100_000);
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger accKey = new AtomicInteger();
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            /**
+             * One organization, one person, two accounts.
+             */
+
+            {
+                int orgId1 = keyForNode(aff, orgKey, node0);
+
+                orgCache.put(orgId1, new Organization("obj-" + orgId1));
+
+                int pid1 = keyForNode(aff, pKey, node0);
+                personCache.put(pid1, new Person(orgId1, "o1-p1"));
+
+                accCache.put(keyForNode(aff, accKey, node0), new Account(pid1, 
orgId1, "a0"));
+                accCache.put(keyForNode(aff, accKey, node1), new Account(pid1, 
orgId1, "a1"));
+            }
+
+            IgniteCache<Object, Object> qryCache = replicated(orgCache) ? 
personCache : orgCache;
+
+            checkQuery("select p._key, p.name, a.name " +
+                "from \"person\".Person p, \"acc\".Account a " +
+                "where p._key = a.personId", qryCache, false, 2);
+
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"org\".Organization o, \"person\".Person p, 
\"acc\".Account a " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key", qryCache, false, 2);
+
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"org\".Organization o, \"acc\".Account a, 
\"person\".Person p " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key", qryCache, false, 2);
+
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"person\".Person p, \"org\".Organization o, 
\"acc\".Account a " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key", qryCache, false, 2);
+
+            checkQuery("select * from (select o.name n1, p._key, p.name n2, 
a.name n3 " +
+                "from \"acc\".Account a, \"person\".Person p, 
\"org\".Organization o " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key)", qryCache, false, 2);
+
+            checkQuery("select * from (select o.name n1, p._key, p.name n2, 
a.name n3 " +
+                "from \"person\".Person p, \"acc\".Account a, 
\"org\".Organization o " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key)", qryCache, false, 2);
+
+            List<List<?>> res = checkQuery("select count(*) " +
+                "from \"org\".Organization o, \"person\".Person p, 
\"acc\".Account a " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key", qryCache, false, 1);
+
+            assertEquals(2L, res.get(0).get(0));
+
+            checkQueries(qryCache, 2);
+
+            {
+                int orgId2 = keyForNode(aff, orgKey, node1);
+
+                orgCache.put(orgId2, new Organization("obj-" + orgId2));
+
+                int pid2 = keyForNode(aff, pKey, node0);
+                personCache.put(pid2, new Person(orgId2, "o2-p1"));
+
+                accCache.put(keyForNode(aff, accKey, node0), new Account(pid2, 
orgId2, "a3"));
+                accCache.put(keyForNode(aff, accKey, node1), new Account(pid2, 
orgId2, "a4"));
+            }
+
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"org\".Organization o, \"person\".Person p, 
\"acc\".Account a " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key", qryCache, false, 4);
+
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"org\".Organization o inner join \"person\".Person p on 
p.orgId = o._key " +
+                "inner join \"acc\".Account a on p._key = a.personId and 
a.orgId=o._key", qryCache, false, 4);
+
+            res = checkQuery("select count(*) " +
+                "from \"org\".Organization o, \"person\".Person p, 
\"acc\".Account a " +
+                "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key", qryCache, false, 1);
+
+            assertEquals(4L, res.get(0).get(0));
+
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"org\".Organization o, \"person\".Person p, 
\"acc\".Account a " +
+                "where p.orgId = o._key and a.orgId = o._key and 
a.orgId=o._key", qryCache, false, 4);
+
+            res = checkQuery("select count(*) " +
+                "from \"org\".Organization o, \"person\".Person p, 
\"acc\".Account a " +
+                "where p.orgId = o._key and a.orgId = o._key and 
a.orgId=o._key", qryCache, false, 1);
+
+            assertEquals(4L, res.get(0).get(0));
+
+            checkQueries(qryCache, 4);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+            client.destroyCache(ACCOUNT_CACHE);
+        }
+    }
+
+    /**
+     * @param qryCache Query cache.
+     * @param expSize Expected results size.
+     */
+    private void checkQueries(IgniteCache<Object, Object> qryCache, int 
expSize) {
+        String[] cacheNames = {"\"org\".Organization o", "\"person\".Person 
p", "\"acc\".Account a"};
+
+        for (int c1 = 0; c1 < cacheNames.length; c1++) {
+            for (int c2 = 0; c2 < cacheNames.length; c2++) {
+                if (c2 == c1)
+                    continue;
+
+                for (int c3 = 0; c3 < cacheNames.length; c3++) {
+                    if (c3 == c1 || c3 == c2)
+                        continue;
+
+                    String cache1 = cacheNames[c1];
+                    String cache2 = cacheNames[c2];
+                    String cache3 = cacheNames[c3];
+
+                    String qry = "select o.name, p._key, p.name, a.name from " 
+
+                        cache1 + ", " +
+                        cache2 + ", " +
+                        cache3 + " " +
+                        "where p.orgId = o._key and p._key = a.personId and 
a.orgId=o._key";
+
+                    checkQuery(qry, qryCache, false, expSize);
+
+                    qry = "select o.name, p._key, p.name, a.name from " +
+                        cache1 + ", " +
+                        cache2 + ", " +
+                        cache3 + " " +
+                        "where p.orgId = o._key and a.orgId = o._key and 
a.orgId=o._key";
+
+                    checkQuery(qry, qryCache, false, expSize);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expSize Expected results size.
+     * @param args Arguments.
+     * @return Results.
+     */
+    private List<List<?>> checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expSize,
+        Object... args) {
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+        qry.setArgs(args);
+
+        log.info("Plan: " + queryPlan(cache, qry));
+
+        QueryCursor<List<?>> cur = cache.query(qry);
+
+        List<List<?>> res = cur.getAll();
+
+        if (expSize != res.size())
+            log.info("Results: " + res);
+
+        assertEquals(expSize, res.size());
+
+        return res;
+    }
+
+    /**
+     * @param cache Cache.
+     * @return {@code True} if cache is replicated.
+     */
+    private boolean replicated(IgniteCache<?, ?> cache) {
+        return cache.getConfiguration(CacheConfiguration.class).getCacheMode() 
== REPLICATED;
+    }
+
+    /**
+     *
+     */
+    private static class Account implements Serializable {
+        /** */
+        int personId;
+
+        /** */
+        int orgId;
+
+        /** */
+        String name;
+
+        /**
+         * @param personId Person ID.
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Account(int personId, int orgId, String name) {
+            this.personId = personId;
+            this.orgId = orgId;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int orgId;
+
+        /** */
+        String name;
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Organization.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
new file mode 100644
index 0000000..af56c91
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinQueryConditionsTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private boolean client;
+
+    /** */
+    private int total;
+
+    /** */
+    private CacheMemoryMode memMode = ONHEAP_TIERED;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi) cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery1() throws Exception {
+        joinQuery1(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery1Offheap() throws Exception {
+        memMode = OFFHEAP_TIERED;
+
+        testJoinQuery1();
+    }
+
+    /**
+     * @param idx Use index flag.
+     * @throws Exception If failed.
+     */
+    private void joinQuery1(boolean idx) throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 =
+                
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(idx, 
idx)));
+            CacheConfiguration ccfg2 =
+                
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(idx)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            client.createCache(ccfg2);
+
+            List<Integer> orgIds = putData1();
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key", pCache, total);
+
+            checkQuery("select * from (select o._key, o.name, p._key pKey, 
p.name pName " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key)", pCache, total);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o inner join Person p " +
+                "on p.orgId = o._key", pCache, total);
+
+            checkQuery("select * from (select o._key o_key, o.name o_name, 
p._key p_key, p.name p_name " +
+                "from \"org\".Organization o inner join Person p " +
+                "on p.orgId = o._key)", pCache, total);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key=" + orgIds.get(3), pCache, 
3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," 
+ orgIds.get(3) + ")", pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," 
+ orgIds.get(3) + ")", pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key > " + orgIds.get(2), 
pCache, total - 3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key > " + orgIds.get(1) + " and 
o._key < " + orgIds.get(4), pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name", pCache, total);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o._key=" + orgIds.get(0), pCache, 
0);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o._key=" + orgIds.get(3), pCache, 
3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o._key IN (" + orgIds.get(2) + "," 
+ orgIds.get(3) + ")", pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o.name='obj-" + orgIds.get(3) + 
"'", pCache, 3);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery2() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
true)));
+            CacheConfiguration ccfg2 = 
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger pKey = new AtomicInteger();
+
+            List<Integer> pIds = new ArrayList<>();
+
+            for (int i = 0; i < 3; i++) {
+                Integer orgId = keyForNode(aff, orgKey, node0);
+
+                orgCache.put(orgId, new Organization("org-" + orgId));
+
+                Integer pId = keyForNode(aff, pKey, node1);
+
+                pCache.put(pId, new Person(orgId, "p-" + orgId));
+
+                pIds.add(pId);
+            }
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and p._key >= 0", pCache, 3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and p._key=" + pIds.get(0), pCache, 1);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and p._key in (" + pIds.get(0) + ", " 
+ pIds.get(1) + ")", pCache, 2);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testJoinQuery3() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
true)));
+            CacheConfiguration ccfg2 = 
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger pKey = new AtomicInteger();
+
+            List<Integer> pIds = new ArrayList<>();
+
+            for (int i = 0; i < 3; i++) {
+                Integer orgId = keyForNode(aff, orgKey, node0);
+
+                orgCache.put(orgId, new Organization("org-" + orgId));
+
+                Integer pId = keyForNode(aff, pKey, node1);
+
+                pCache.put(pId, new Person(orgId + 100_000, "p-" + orgId));
+
+                pIds.add(pId);
+            }
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key", pCache, 9);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key and p._key=" + pIds.get(0), pCache, 
3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key and p._key in (" + pIds.get(0) + ", " 
+ pIds.get(1) + ")", pCache, 6);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key and p._key >=" + pIds.get(0) + "and 
p._key <= " + pIds.get(2), pCache, 9);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery4() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 =
+                
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(true, 
false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger pKey = new AtomicInteger();
+
+            Integer pId0 = keyForNode(aff, pKey, node0);
+
+            pCache.put(pId0, new Person(0, "p0"));
+
+            for (int i = 0; i < 3; i++) {
+                Integer pId = keyForNode(aff, pKey, node1);
+
+                pCache.put(pId, new Person(0, "p"));
+            }
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p2._key > p1._key", pCache, 6);
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p2._key > p1._key and p1._key=" + pId0, pCache, 3);
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p2._key > p1._key and p1.name='p0'", pCache, 3);
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p1.name > p2.name", pCache, 3);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery5() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
true)));
+            CacheConfiguration ccfg2 = 
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger pKey = new AtomicInteger();
+
+            Integer orgId = keyForNode(aff, orgKey, node0);
+
+            orgCache.put(orgId, new Organization("org-" + orgId));
+
+            Integer pId = keyForNode(aff, pKey, node1);
+
+            pCache.put(pId, new Person(orgId, "p-" + orgId));
+
+            checkQuery("select o._key from \"org\".Organization o, Person p 
where p.orgId = o._key", pCache, 1);
+
+            // Distributed join is not enabled for expressions, just check 
query does not fail.
+            checkQuery("select o.name from \"org\".Organization o where o._key 
in " +
+                "(select o._key from \"org\".Organization o, Person p where 
p.orgId = o._key)", pCache, 0);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery6() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 =
+                
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(true, 
true)));
+            CacheConfiguration ccfg2 =
+                
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(true)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+
+            client.createCache(ccfg2);
+
+            putData1();
+
+            checkQuery("select _key, name from \"org\".Organization o " +
+                "inner join (select orgId from Person) p on p.orgId = o._key", 
pCache, total);
+
+            checkQuery("select o._key, o.name from (select _key, name from 
\"org\".Organization) o " +
+                "inner join Person p on p.orgId = o._key", pCache, total);
+
+            checkQuery("select o._key, o.name from (select _key, name from 
\"org\".Organization) o " +
+                "inner join (select orgId from Person) p on p.orgId = o._key", 
pCache, total);
+
+            checkQuery("select * from " +
+                "(select _key, name from \"org\".Organization) o " +
+                "inner join " +
+                "(select orgId from Person) p " +
+                "on p.orgId = o._key", pCache, total);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param expSize Expected results size.
+     * @param args Arguments.
+     */
+    private void checkQuery(String sql, IgniteCache<Object, Object> cache, int 
expSize, Object... args) {
+        log.info("Execute query: " + sql);
+
+        checkQuery(sql, cache, false, expSize, args);
+
+        checkQuery(sql, cache, true, expSize, args);
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expSize Expected results size.
+     * @param args Arguments.
+     */
+    private void checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expSize,
+        Object... args) {
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+        qry.setArgs(args);
+
+        log.info("Plan: " + queryPlan(cache, qry));
+
+        QueryCursor<List<?>> cur = cache.query(qry);
+
+        List<List<?>> res = cur.getAll();
+
+        if (expSize != res.size())
+            log.info("Results: " + res);
+
+        assertEquals(expSize, res.size());
+    }
+
+    /**
+     * @param idxName Name index flag.
+     * @param idxOrgId Org ID index flag.
+     * @return Entity.
+     */
+    private QueryEntity personEntity(boolean idxName, boolean idxOrgId) {
+        QueryEntity entity = new QueryEntity();
+
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Person.class.getName());
+
+        entity.addQueryField("orgId", Integer.class.getName(), null);
+        entity.addQueryField("name", String.class.getName(), null);
+
+        List<QueryIndex> idxs = new ArrayList<>();
+
+        if (idxName) {
+            QueryIndex idx = new QueryIndex("name");
+
+            idxs.add(idx);
+        }
+
+        if (idxOrgId) {
+            QueryIndex idx = new QueryIndex("orgId");
+
+            idxs.add(idx);
+        }
+
+        entity.setIndexes(idxs);
+
+        return entity;
+    }
+
+    /**
+     * @param idxName Name index flag.
+     * @return Entity.
+     */
+    private QueryEntity organizationEntity(boolean idxName) {
+        QueryEntity entity = new QueryEntity();
+
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Organization.class.getName());
+
+        entity.addQueryField("name", String.class.getName(), null);
+
+        if (idxName) {
+            QueryIndex idx = new QueryIndex("name");
+
+            entity.setIndexes(F.asList(idx));
+        }
+
+        return entity;
+    }
+
+    /**
+     * @return Organization ids.
+     */
+    private List<Integer> putData1() {
+        total = 0;
+
+        Ignite client = grid(2);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+
+        AtomicInteger pKey = new AtomicInteger();
+        AtomicInteger orgKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        List<Integer> data = new ArrayList<>();
+
+        for (int i = 0; i < 5; i++) {
+            int orgId = keyForNode(aff, orgKey, node0);
+
+            orgCache.put(orgId, new Organization("obj-" + orgId));
+
+            for (int j = 0; j < i; j++) {
+                personCache.put(keyForNode(aff, pKey, node1), new 
Person(orgId, "obj-" + orgId));
+
+                total++;
+            }
+
+            data.add(orgId);
+        }
+
+        return data;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(0);
+        ccfg.setMemoryMode(memMode);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int orgId;
+
+        /** */
+        String name;
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java
new file mode 100644
index 0000000..ea3b141
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ */
+public class IgniteCacheDistributedJoinTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static Connection conn;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        CacheConfiguration<Integer, A> ccfga = new CacheConfiguration<>();
+
+        ccfga.setName("a");
+        ccfga.setSqlSchema("A");
+        ccfga.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfga.setBackups(1);
+        ccfga.setCacheMode(CacheMode.PARTITIONED);
+        ccfga.setIndexedTypes(Integer.class, A.class);
+
+        CacheConfiguration<Integer, B> ccfgb = new CacheConfiguration<>();
+
+        ccfgb.setName("b");
+        ccfgb.setSqlSchema("B");
+        ccfgb.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfgb.setBackups(1);
+        ccfgb.setCacheMode(CacheMode.PARTITIONED);
+        ccfgb.setIndexedTypes(Integer.class, B.class);
+
+        CacheConfiguration<Integer, C> ccfgc = new CacheConfiguration<>();
+
+        ccfgc.setName("c");
+        ccfgc.setSqlSchema("C");
+        ccfgc.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfgc.setBackups(1);
+        ccfgc.setCacheMode(CacheMode.PARTITIONED);
+        ccfgc.setIndexedTypes(Integer.class, C.class);
+
+        cfg.setCacheConfiguration(ccfga, ccfgb, ccfgc);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(4);
+
+        awaitPartitionMapExchange();
+
+        conn = DriverManager.getConnection("jdbc:h2:mem:");
+
+        Statement s = conn.createStatement();
+
+        s.execute("create schema a");
+        s.execute("create schema b");
+        s.execute("create schema c");
+
+        s.execute("create table a.a(a bigint, b bigint, c bigint)");
+        s.execute("create table b.b(a bigint, b bigint, c bigint)");
+        s.execute("create table c.c(a bigint, b bigint, c bigint)");
+
+        s.execute("create index on a.a(a)");
+        s.execute("create index on a.a(b)");
+        s.execute("create index on a.a(c)");
+
+        s.execute("create index on b.b(a)");
+        s.execute("create index on b.b(b)");
+        s.execute("create index on b.b(c)");
+
+        s.execute("create index on c.c(a)");
+        s.execute("create index on c.c(b)");
+        s.execute("create index on c.c(c)");
+
+        GridRandom rnd = new GridRandom();
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Integer,A> a = ignite.cache("a");
+        IgniteCache<Integer,B> b = ignite.cache("b");
+        IgniteCache<Integer,C> c = ignite.cache("c");
+
+        for (int i = 0; i < 100; i++) {
+            a.put(i, insert(s, new A(rnd.nextInt(50), rnd.nextInt(100), 
rnd.nextInt(150))));
+            b.put(i, insert(s, new B(rnd.nextInt(100), rnd.nextInt(50), 
rnd.nextInt(150))));
+            c.put(i, insert(s, new C(rnd.nextInt(150), rnd.nextInt(100), 
rnd.nextInt(50))));
+        }
+
+        checkSameResult(s, a, "select a, count(*) from a group by a order by 
a");
+        checkSameResult(s, a, "select b, count(*) from a group by b order by 
b");
+        checkSameResult(s, a, "select c, count(*) from a group by c order by 
c");
+
+        checkSameResult(s, b, "select a, count(*) from b group by a order by 
a");
+        checkSameResult(s, b, "select b, count(*) from b group by b order by 
b");
+        checkSameResult(s, b, "select c, count(*) from b group by c order by 
c");
+
+        checkSameResult(s, c, "select a, count(*) from c group by a order by 
a");
+        checkSameResult(s, c, "select b, count(*) from c group by b order by 
b");
+        checkSameResult(s, c, "select c, count(*) from c group by c order by 
c");
+
+        s.close();
+    }
+
+    /**
+     * @param s Statement.
+     * @param c Cache.
+     * @param qry Query.
+     * @throws SQLException If failed.
+     */
+    private <Z extends X> void checkSameResult(Statement s, 
IgniteCache<Integer, Z> c, String qry) throws SQLException {
+        s.executeUpdate("SET SCHEMA " + c.getName());
+
+        try (
+            ResultSet rs1 = s.executeQuery(qry);
+            QueryCursor<List<?>> rs2 = c.query(new 
SqlFieldsQuery(qry).setDistributedJoins(true))
+        ) {
+            Iterator<List<?>> iter = rs2.iterator();
+
+            for (;;) {
+                if (!rs1.next()) {
+                    assertFalse(iter.hasNext());
+
+                    return;
+                }
+
+                assertTrue(iter.hasNext());
+
+                List<?> row = iter.next();
+
+                for (int i = 0; i < row.size(); i++)
+                    assertEquals(rs1.getLong(i + 1), row.get(i));
+            }
+        }
+    }
+
+    /**
+     * @param s Statement.
+     * @param z Value.
+     * @return Value.
+     * @throws SQLException If failed.
+     */
+    private static <Z extends X> Z insert(Statement s, Z z) throws 
SQLException {
+        String tbl = z.getClass().getSimpleName();
+
+        tbl = tbl + "." + tbl;
+
+        String insert = "insert into " + tbl + " values(" + z.a + ", " + z.b + 
", " + z.c + ")";
+
+        s.executeUpdate(insert);
+
+        return z;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoins() throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Integer,A> a = ignite.cache("a");
+        IgniteCache<Integer,B> b = ignite.cache("b");
+        IgniteCache<Integer,C> c = ignite.cache("c");
+
+        Statement s = conn.createStatement();
+
+        checkSameResult(s, a, "select a.c, b.b, c.a from a.a, b.b, c.c where 
a.a = b.a and b.c = c.c order by a.c, b.b, c.a");
+        checkSameResult(s, b, "select a.a, b.c, c.b from a.a, b.b, c.c where 
a.b = b.b and b.a = c.a order by a.a, b.c, c.b");
+        checkSameResult(s, c, "select a.b, b.a, c.c from a.a, b.b, c.c where 
a.c = b.c and b.b = c.b order by a.b, b.a, c.c");
+
+        for (int i = 0; i < 150; i++) {
+            checkSameResult(s, a, "select a.c, b.b, c.a from a.a, b.b, c.c 
where " +
+                i + " = a.c and a.a = b.a and b.c = c.c order by a.c, b.b, 
c.a");
+            checkSameResult(s, b, "select a.a, b.c, c.b from a.a, b.b, c.c 
where " +
+                i + " = c.b and a.b = b.b and b.a = c.a order by a.a, b.c, 
c.b");
+            checkSameResult(s, c, "select a.b, b.a, c.c from a.a, b.b, c.c 
where " +
+                i + " = b.c and a.c = b.c and b.b = c.b order by a.b, b.a, 
c.c");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        U.closeQuiet(conn);
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    public static class X {
+        /** */
+        public long a;
+
+        /** */
+        public long b;
+
+        /** */
+        public long c;
+
+        /**
+         * @param a A.
+         * @param b B.
+         * @param c C.
+         */
+        public X(int a, int b, int c) {
+            this.a = a;
+            this.b = b;
+            this.c = c;
+        }
+
+        /** */
+        @QuerySqlField(index = true)
+        public long getA() {
+            return a;
+        }
+
+        /** */
+        @QuerySqlField(index = true)
+        public long getB() {
+            return b;
+        }
+
+        /** */
+        @QuerySqlField(index = true)
+        public long getC() {
+            return c;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class A extends X {
+        /**
+         * @param a A.
+         * @param b B.
+         * @param c C.
+         */
+        public A(int a, int b, int c) {
+            super(a, b, c);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class B extends X {
+        /**
+         * @param a A.
+         * @param b B.
+         * @param c C.
+         */
+        public B(int a, int b, int c) {
+            super(a, b, c);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class C extends X {
+        /**
+         * @param a A.
+         * @param b B.
+         * @param c C.
+         */
+        public C(int a, int b, int c) {
+            super(a, b, c);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
new file mode 100644
index 0000000..fa9032d
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import 
org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends 
AbstractH2CompareQueryTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ACCOUNT_CACHE = "acc";
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean h2DataInserted;
+
+    /** {@inheritDoc} */
+    @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, 
CacheMode mode) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void initCacheAndDbData() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkAllDataEquals() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration personCache() {
+        CacheConfiguration ccfg = configuration(PERSON_CACHE, 0);
+
+        // Person cache is replicated.
+        ccfg.setCacheMode(REPLICATED);
+
+        QueryEntity entity = new QueryEntity();
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Person.class.getName());
+        entity.addQueryField("name", String.class.getName(), null);
+
+        ccfg.setQueryEntities(F.asList(entity));
+
+        return ccfg;
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration accountCache(int backups) {
+        CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, backups);
+
+        QueryEntity entity = new QueryEntity();
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Account.class.getName());
+        entity.addQueryField("personId", Integer.class.getName(), null);
+        entity.addQueryField("name", String.class.getName(), null);
+        entity.setIndexes(F.asList(new QueryIndex("personId")));
+
+        ccfg.setQueryEntities(F.asList(entity));
+
+        return ccfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name, int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = true;
+
+        startGrid(SRVS);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Statement initializeH2Schema() throws SQLException {
+        Statement st = super.initializeH2Schema();
+
+        st.execute("CREATE SCHEMA \"person\"");
+        st.execute("CREATE SCHEMA \"acc\"");
+
+        st.execute("create table \"person\".PERSON" +
+            "  (_key int not null," +
+            "  _val other not null," +
+            "  name varchar(255))");
+
+        st.execute("create table \"acc\".ACCOUNT" +
+            "  (_key int not null," +
+            "  _val other not null," +
+            "  personId int," +
+            "  name varchar(255))");
+
+        return st;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        Ignite client = grid(SRVS);
+
+        client.createCache(personCache());
+
+        checkJoin(0);
+
+        h2DataInserted = true;
+
+        checkJoin(1);
+
+        checkJoin(2);
+    }
+
+    /**
+     * @param accBackups Account cache backups.
+     * @throws Exception If failed.
+     */
+    private void checkJoin(int accBackups) throws Exception {
+        Ignite client = grid(SRVS);
+
+        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        AtomicInteger pKey = new AtomicInteger(100_000);
+        AtomicInteger accKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        try {
+            IgniteCache<Object, Object> accCache = 
client.createCache(accountCache(accBackups));
+
+            Integer pKey1 = keyForNode(aff, pKey, node0); // No accounts.
+            insert(personCache, pKey1, new Person("p1"));
+
+            Integer pKey2 = keyForNode(aff, pKey, node0); // 1 collocated 
account.
+            insert(personCache, pKey2, new Person("p2"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey2, "a-p2"));
+
+            Integer pKey3 = keyForNode(aff, pKey, node0); // 1 non-collocated 
account.
+            insert(personCache, pKey3, new Person("p3"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey3, "a-p3"));
+
+            Integer pKey4 = keyForNode(aff, pKey, node0); // 1 collocated, 1 
non-collocated account.
+            insert(personCache, pKey4, new Person("p4"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey4, "a-p4-1"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey4, "a-p4-2"));
+
+            Integer pKey5 = keyForNode(aff, pKey, node0); // 2 collocated 
accounts.
+            insert(personCache, pKey5, new Person("p5"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey5, "a-p5-1"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey5, "a-p5-1"));
+
+            Integer pKey6 = keyForNode(aff, pKey, node0); // 2 non-collocated 
accounts.
+            insert(personCache, pKey6, new Person("p6"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey6, "a-p5-1"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey6, "a-p5-1"));
+
+            Integer[] keys = {pKey1, pKey2, pKey3, pKey4, pKey5, pKey6};
+
+            for (int i = 0; i < keys.length; i++) {
+                log.info("Test key: " + i);
+
+                Integer key = keys[i];
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"person\".Person p, \"acc\".Account a " +
+                    "where p._key = a.personId and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"acc\".Account a, \"person\".Person p " +
+                    "where p._key = a.personId and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"person\".Person p right outer join \"acc\".Account 
a " +
+                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"acc\".Account a left outer join \"person\".Person 
p " +
+                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+
+//                checkQuery("select p._key, p.name, a.name " +
+//                    "from \"acc\".Account a right outer join 
\"person\".Person p " +
+//                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+//
+//                checkQuery("select p._key, p.name, a.name " +
+//                    "from \"person\".Person p left outer join 
\"acc\".Account a " +
+//                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+            }
+        }
+        finally {
+            client.destroyCache(ACCOUNT_CACHE);
+
+            personCache.removeAll();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param p Person.
+     * @throws Exception If failed.
+     */
+    private void insert(IgniteCache<Object, Object> cache, int key, Person p) 
throws Exception {
+        cache.put(key, p);
+
+        if (h2DataInserted)
+            return;
+
+        try(PreparedStatement st = conn.prepareStatement("insert into 
\"person\".PERSON " +
+            "(_key, _val, name) values(?, ?, ?)")) {
+            st.setObject(1, key);
+            st.setObject(2, p);
+            st.setObject(3, p.name);
+
+            st.executeUpdate();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param  a Account.
+     * @throws Exception If failed.
+     */
+    private void insert(IgniteCache<Object, Object> cache, int key, Account a) 
throws Exception {
+        cache.put(key, a);
+
+        if (h2DataInserted)
+            return;
+
+        try(PreparedStatement st = conn.prepareStatement("insert into 
\"acc\".ACCOUNT " +
+            "(_key, _val, personId, name) values(?, ?, ?, ?)")) {
+            st.setObject(1, key);
+            st.setObject(2, a);
+            st.setObject(3, a.personId);
+            st.setObject(4, a.name);
+
+            st.executeUpdate();
+        }
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param args Arguments.
+     * @throws Exception If failed.
+     */
+    private void checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        Object... args) throws Exception {
+        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setArgs(args)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+
+        log.info("Plan: " + plan);
+
+        compareQueryRes0(cache, sql, true, enforceJoinOrder, args, 
Ordering.RANDOM);
+    }
+
+    /**
+     *
+     */
+    private static class Account implements Serializable {
+        /** */
+        int personId;
+
+        /** */
+        String name;
+
+        /**
+         * @param personId Person ID.
+         * @param name Name.
+         */
+        public Account(int personId, String name) {
+            this.personId = personId;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Person(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
index 39c0cb5..1d528c9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
@@ -32,8 +29,15 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import javax.cache.CacheException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -53,6 +57,9 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
     private static final String ORG_CACHE = "org";
 
     /** */
+    private static final String ORG_CACHE_REPLICATED = "orgRepl";
+
+    /** */
     private boolean client;
 
     /** {@inheritDoc} */
@@ -95,6 +102,22 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
             ccfgs.add(ccfg);
         }
 
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE_REPLICATED);
+
+            ccfg.setCacheMode(REPLICATED);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+            entity.addQueryField("id", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
         cfg.setCacheConfiguration(ccfgs.toArray(new 
CacheConfiguration[ccfgs.size()]));
 
         cfg.setClientMode(client);
@@ -144,10 +167,12 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
 
         IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
         IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+        IgniteCache<Object, Object> orgCacheRepl = 
client.cache(ORG_CACHE_REPLICATED);
 
         List<Integer> keys = primaryKeys(ignite(0).cache(PERSON_CACHE), 3, 
200_000);
 
         orgCache.put(keys.get(0), new Organization(0, "org1"));
+        orgCacheRepl.put(keys.get(0), new Organization(0, "org1"));
         personCache.put(keys.get(1), new Person(0, "p1"));
         personCache.put(keys.get(2), new Person(0, "p2"));
 
@@ -160,12 +185,54 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
             "on (p.orgId = o.id)", orgCache, 2);
 
         checkQuery("select o.name, p._key, p.name " +
-            "from \"person\".Person p left join \"org\".Organization o " +
-            "on (p.orgId = o.id)", orgCache, 2);
+            "from \"person\".Person p join \"orgRepl\".Organization o " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"orgRepl\".Organization o join \"person\".Person p " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQuery("select p.name from \"person\".Person p", 
ignite(0).cache(PERSON_CACHE), 2);
+        checkQuery("select p.name from \"person\".Person p", 
ignite(1).cache(PERSON_CACHE), 2);
+
+        for (int i = 0; i < 10; i++)
+            checkQuery("select p.name from \"person\".Person p", personCache, 
2);
 
         checkQuery("select o.name, p._key, p.name " +
             "from \"org\".Organization o left join \"person\".Person p " +
             "on (p.orgId = o.id)", orgCache, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"person\".Person p left join \"orgRepl\".Organization o " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"orgRepl\".Organization o left join \"person\".Person p " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQueryFails("select o.name, p._key, p.name " +
+                "from \"person\".Person p left join \"org\".Organization o " +
+                "on (p.orgId = o.id)", personCache);
+
+        checkQueryFails("select o.name, p._key, p.name " +
+                "from \"org\".Organization o right join \"person\".Person p " +
+                "on (p.orgId = o.id)", personCache);
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     */
+    private void checkQueryFails(final String sql, final IgniteCache<Object, 
Object> cache) {
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+                cache.query(qry).getAll();
+
+                return null;
+            }
+        }, CacheException.class, null);
     }
 
     /**
@@ -233,6 +300,7 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
         int id;
 
         /**
+         * @param id ID.
          * @param name Name.
          */
         public Organization(int id, String name) {

Reply via email to