http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
new file mode 100644
index 0000000..0e6806f
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Test for distributed queries with node restarts.
+ */
+public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String QRY_0 = "select co._key, count(*) cnt\n" +
+        "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, 
\"pu\".Purchase pu\n" +
+        "where pe._key = pu.personId and pu.productId = pr._key and 
pr.companyId = co._key \n" +
+        "group by co._key order by cnt desc, co._key";
+
+    /** */
+    private static final String QRY_0_BROADCAST = "select co._key, count(*) 
cnt\n" +
+        "from \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, 
\"pe\".Person pe \n" +
+        "where pe._key = pu.personId and pu.productId = pr._key and 
pr.companyId = co._key \n" +
+        "group by co._key order by cnt desc, co._key";
+
+    /** */
+    private static final String QRY_1 = "select pr._key, co._key\n" +
+        "from \"pr\".Product pr, \"co\".Company co\n" +
+        "where pr.companyId = co._key\n" +
+        "order by co._key, pr._key ";
+
+    /** */
+    private static final String QRY_1_BROADCAST = "select pr._key, co._key\n" +
+        "from \"co\".Company co, \"pr\".Product pr \n" +
+        "where pr.companyId = co._key\n" +
+        "order by co._key, pr._key ";
+
+    /** */
+    private static final int GRID_CNT = 6;
+
+    /** */
+    private static final int PERS_CNT = 600;
+
+    /** */
+    private static final int PURCHASE_CNT = 6000;
+
+    /** */
+    private static final int COMPANY_CNT = 25;
+
+    /** */
+    private static final int PRODUCT_CNT = 100;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        int i = 0;
+
+        CacheConfiguration<?, ?>[] ccs = new CacheConfiguration[4];
+
+        for (String name : F.asList("pe", "pu", "co", "pr")) {
+            CacheConfiguration<?, ?> cc = defaultCacheConfiguration();
+
+            cc.setName(name);
+            cc.setCacheMode(PARTITIONED);
+            cc.setBackups(2);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+            cc.setAtomicityMode(TRANSACTIONAL);
+            cc.setRebalanceMode(SYNC);
+            cc.setLongQueryWarningTimeout(15_000);
+            cc.setAffinity(new RendezvousAffinityFunction(false, 60));
+
+            switch (name) {
+                case "pe":
+                    cc.setIndexedTypes(
+                        Integer.class, Person.class
+                    );
+
+                    break;
+
+                case "pu":
+                    cc.setIndexedTypes(
+                        Integer.class, Purchase.class
+                    );
+
+                    break;
+
+                case "co":
+                    cc.setIndexedTypes(
+                        Integer.class, Company.class
+                    );
+
+                    break;
+
+                case "pr":
+                    cc.setIndexedTypes(
+                        Integer.class, Product.class
+                    );
+
+                    break;
+            }
+
+            ccs[i++] = cc;
+        }
+
+        c.setCacheConfiguration(ccs);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(GRID_CNT);
+
+        fillCaches();
+    }
+
+    /**
+     *
+     */
+    private void fillCaches() {
+        IgniteCache<Integer, Company> co = grid(0).cache("co");
+
+        for (int i = 0; i < COMPANY_CNT; i++)
+            co.put(i, new Company(i));
+
+        IgniteCache<Integer, Product> pr = grid(0).cache("pr");
+
+        Random rnd = new GridRandom();
+
+        for (int i = 0; i < PRODUCT_CNT; i++)
+            pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
+
+        IgniteCache<Integer, Person> pe = grid(0).cache("pe");
+
+        for (int i = 0; i < PERS_CNT; i++)
+            pe.put(i, new Person(i));
+
+        IgniteCache<Integer, Purchase> pu = grid(0).cache("pu");
+
+        for (int i = 0; i < PURCHASE_CNT; i++) {
+            int persId = rnd.nextInt(PERS_CNT);
+            int prodId = rnd.nextInt(PRODUCT_CNT);
+
+            pu.put(i, new Purchase(persId, prodId));
+        }
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestarts() throws Exception {
+        restarts(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartsBroadcast() throws Exception {
+        restarts(true);
+    }
+
+    /**
+     * @param broadcastQry If {@code true} tests broadcast query.
+     * @throws Exception If failed.
+     */
+    private void restarts(final boolean broadcastQry) throws Exception {
+        int duration = 90 * 1000;
+        int qryThreadNum = 4;
+        int restartThreadsNum = 2; // 4 + 2 = 6 nodes
+        final int nodeLifeTime = 4000;
+        final int logFreq = 100;
+
+        final AtomicIntegerArray locks = new AtomicIntegerArray(GRID_CNT);
+
+        SqlFieldsQuery qry0 ;
+
+        if (broadcastQry)
+            qry0 = new 
SqlFieldsQuery(QRY_0_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+        else
+            qry0 = new SqlFieldsQuery(QRY_0).setDistributedJoins(true);
+
+        String plan = queryPlan(grid(0).cache("pu"), qry0);
+
+        X.println("Plan1: " + plan);
+
+        assertEquals(broadcastQry, plan.contains("batched:broadcast"));
+
+        final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll();
+
+        Thread.sleep(3000);
+
+        assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll());
+
+        final SqlFieldsQuery qry1;
+
+        if (broadcastQry)
+            qry1 = new 
SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+        else
+            qry1 = new SqlFieldsQuery(QRY_1).setDistributedJoins(true);
+
+        plan = queryPlan(grid(0).cache("co"), qry1);
+
+        X.println("Plan2: " + plan);
+
+        assertEquals(broadcastQry, plan.contains("batched:broadcast"));
+
+        final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll();
+
+        assertFalse(pRes.isEmpty());
+        assertFalse(rRes.isEmpty());
+
+        final AtomicInteger qryCnt = new AtomicInteger();
+        final AtomicBoolean qrysDone = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                GridRandom rnd = new GridRandom();
+
+                while (!qrysDone.get()) {
+                    int g;
+
+                    do {
+                        g = rnd.nextInt(locks.length());
+                    }
+                    while (!locks.compareAndSet(g, 0, 1));
+
+                    if (rnd.nextBoolean()) {
+                        IgniteCache<?, ?> cache = grid(g).cache("pu");
+
+                        SqlFieldsQuery qry;
+
+                        if (broadcastQry)
+                            qry = new 
SqlFieldsQuery(QRY_0_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+                        else
+                            qry = new 
SqlFieldsQuery(QRY_0).setDistributedJoins(true);
+
+                        boolean smallPageSize = rnd.nextBoolean();
+
+                        qry.setPageSize(smallPageSize ? 30 : 1000);
+
+                        try {
+                            assertEquals(pRes, cache.query(qry).getAll());
+                        }
+                        catch (CacheException e) {
+                            assertTrue("On large page size must retry.", 
smallPageSize);
+
+                            boolean failedOnRemoteFetch = false;
+
+                            for (Throwable th = e; th != null; th = 
th.getCause()) {
+                                if (!(th instanceof CacheException))
+                                    continue;
+
+                                if (th.getMessage() != null &&
+                                    th.getMessage().startsWith("Failed to 
fetch data from node:")) {
+                                    failedOnRemoteFetch = true;
+
+                                    break;
+                                }
+                            }
+
+                            if (!failedOnRemoteFetch) {
+                                e.printStackTrace();
+
+                                fail("Must fail inside of 
GridResultPage.fetchNextPage or subclass.");
+                            }
+                        }
+                    }
+                    else {
+                        IgniteCache<?, ?> cache = grid(g).cache("co");
+
+                        SqlFieldsQuery qry;
+
+                        if (broadcastQry)
+                            qry = new 
SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+                        else
+                            qry = new 
SqlFieldsQuery(QRY_1).setDistributedJoins(true);
+
+                        assertEquals(rRes, cache.query(qry1).getAll());
+                    }
+
+                    locks.set(g, 0);
+
+                    int c = qryCnt.incrementAndGet();
+
+                    if (c % logFreq == 0)
+                        info("Executed queries: " + c);
+                }
+            }
+        }, qryThreadNum, "query-thread");
+
+        final AtomicInteger restartCnt = new AtomicInteger();
+
+        final AtomicBoolean restartsDone = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut2 = multithreadedAsync(new 
Callable<Object>() {
+            @SuppressWarnings({"BusyWait"})
+            @Override public Object call() throws Exception {
+                GridRandom rnd = new GridRandom();
+
+                while (!restartsDone.get()) {
+                    int g;
+
+                    do {
+                        g = rnd.nextInt(locks.length());
+                    }
+                    while (!locks.compareAndSet(g, 0, -1));
+
+                    log.info("Stop node: " + g);
+
+                    stopGrid(g);
+
+                    Thread.sleep(rnd.nextInt(nodeLifeTime));
+
+                    log.info("Start node: " + g);
+
+                    startGrid(g);
+
+                    Thread.sleep(rnd.nextInt(nodeLifeTime));
+
+                    locks.set(g, 0);
+
+                    int c = restartCnt.incrementAndGet();
+
+                    if (c % logFreq == 0)
+                        info("Node restarts: " + c);
+                }
+
+                return true;
+            }
+        }, restartThreadsNum, "restart-thread");
+
+        Thread.sleep(duration);
+
+        info("Stopping..");
+
+        restartsDone.set(true);
+        qrysDone.set(true);
+
+        fut2.get();
+        fut1.get();
+
+        info("Stopped.");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int id;
+
+        /**
+         * @param id ID.
+         */
+        Person(int id) {
+            this.id = id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Purchase implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int personId;
+
+        /** */
+        @QuerySqlField(index = true)
+        int productId;
+
+        /**
+         * @param personId Person ID.
+         * @param productId Product ID.
+         */
+        Purchase(int personId, int productId) {
+            this.personId = personId;
+            this.productId = productId;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Company implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int id;
+
+        /**
+         * @param id ID.
+         */
+        Company(int id) {
+            this.id = id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Product implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int id;
+
+        /** */
+        @QuerySqlField(index = true)
+        int companyId;
+
+        /**
+         * @param id ID.
+         * @param companyId Company ID.
+         */
+        Product(int id, int companyId) {
+            this.id = id;
+            this.companyId = companyId;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
deleted file mode 100644
index 82456fb..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.reducefields;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.AffinityKey;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteReducer;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- * Tests for reduce fields queries.
- */
-public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends 
GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** Flag indicating if starting node should have cache. */
-    protected boolean hasCache;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        if (hasCache)
-            cfg.setCacheConfiguration(cache(null));
-        else
-            cfg.setCacheConfiguration();
-
-        cfg.setDiscoverySpi(discovery());
-
-        return cfg;
-    }
-
-    /**
-     * @return Distribution.
-     */
-    protected NearCacheConfiguration nearConfiguration() {
-        return new NearCacheConfiguration();
-    }
-
-    /**
-     * @param name Cache name.
-     * @return Cache.
-     */
-    private CacheConfiguration cache(@Nullable String name) {
-        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
-
-        cache.setName(name);
-        cache.setCacheMode(cacheMode());
-        cache.setAtomicityMode(atomicityMode());
-        cache.setNearConfiguration(nearConfiguration());
-        cache.setWriteSynchronizationMode(FULL_SYNC);
-        cache.setRebalanceMode(SYNC);
-        cache.setIndexedTypes(
-            String.class, Organization.class,
-            AffinityKey.class, Person.class
-        );
-
-        if (cacheMode() == PARTITIONED)
-            cache.setBackups(1);
-
-        return cache;
-    }
-
-    /**
-     * @return Discovery SPI.
-     */
-    private static DiscoverySpi discovery() {
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(IP_FINDER);
-
-        return spi;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        hasCache = true;
-
-        startGridsMultiThreaded(gridCount());
-
-        hasCache = false;
-
-        startGrid(gridCount());
-
-        IgniteCache<String, Organization> orgCache = grid(0).cache(null);
-
-        assert orgCache != null;
-
-        orgCache.put("o1", new Organization(1, "A"));
-        orgCache.put("o2", new Organization(2, "B"));
-
-        IgniteCache<AffinityKey<String>, Person> personCache = 
grid(0).cache(null);
-
-        assert personCache != null;
-
-        personCache.put(new AffinityKey<>("p1", "o1"), new Person("John 
White", 25, 1));
-        personCache.put(new AffinityKey<>("p2", "o1"), new Person("Joe Black", 
35, 1));
-        personCache.put(new AffinityKey<>("p3", "o2"), new Person("Mike 
Green", 40, 2));
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * @return cache mode.
-     */
-    protected abstract CacheMode cacheMode();
-
-    /**
-     * @return Number of grids to start.
-     */
-    protected abstract int gridCount();
-
-    /**
-     * @return Cache atomicity mode.
-     */
-    protected CacheAtomicityMode atomicityMode() {
-        return TRANSACTIONAL;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNoDataInCache() throws Exception {
-        CacheQuery<List<?>> qry = ((IgniteKernal)grid(0))
-            .getCache(null).context().queries().createSqlFieldsQuery("select 
age from Person where orgId = 999", false);
-
-        Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new 
AverageRemoteReducer()).get();
-
-        assertEquals("Result", 0, F.reduce(res, new 
AverageLocalReducer()).intValue());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAverageQuery() throws Exception {
-        CacheQuery<List<?>> qry = 
((IgniteKernal)grid(0)).getCache(null).context().queries().
-            createSqlFieldsQuery("select age from Person", false);
-
-        Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new 
AverageRemoteReducer()).get();
-
-        assertEquals("Average", 33, F.reduce(res, new 
AverageLocalReducer()).intValue());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testAverageQueryWithArguments() throws Exception {
-        CacheQuery<List<?>> qry = 
((IgniteKernal)grid(0)).getCache(null).context().queries().createSqlFieldsQuery(
-            "select age from Person where orgId = ?", false);
-
-        Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new 
AverageRemoteReducer(), 1).get();
-
-        assertEquals("Average", 30, F.reduce(res, new 
AverageLocalReducer()).intValue());
-    }
-
-//    /**
-//     * @throws Exception If failed.
-//     */
-//    public void testFilters() throws Exception {
-//        GridCacheReduceFieldsQuery<Object, Object, GridBiTuple<Integer, 
Integer>, Integer> qry = ((IgniteKernal)grid(0)).cache(null)
-//            .queries().createReduceFieldsQuery("select age from Person");
-//
-//        qry = qry.remoteKeyFilter(
-//            new GridPredicate<Object>() {
-//                @Override public boolean apply(Object e) {
-//                    return !"p2".equals(((AffinityKey)e).key());
-//                }
-//            }
-//        ).remoteValueFilter(
-//            new P1<Object>() {
-//                @Override public boolean apply(Object e) {
-//                    return !"Mike Green".equals(((Person)e).name);
-//                }
-//            }
-//        );
-//
-//        qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new 
AverageLocalReducer());
-//
-//        Integer avg = qry.reduce().get();
-//
-//        assertNotNull("Average", avg);
-//        assertEquals("Average", 25, avg.intValue());
-//    }
-
-//    /**
-//     * @throws Exception If failed.
-//     */
-//    public void testOnProjectionWithFilter() throws Exception {
-//        P2<AffinityKey<String>, Person> p = new P2<AffinityKey<String>, 
Person>() {
-//            @Override public boolean apply(AffinityKey<String> key, Person 
val) {
-//                return val.orgId == 1;
-//            }
-//        };
-//
-//        InternalCache<AffinityKey<String>, Person> cachePrj =
-//            grid(0).<AffinityKey<String>, Person>cache(null).projection(p);
-//
-//        GridCacheReduceFieldsQuery<AffinityKey<String>, Person, 
GridBiTuple<Integer, Integer>, Integer> qry =
-//            cachePrj.queries().createReduceFieldsQuery("select age from 
Person");
-//
-//        qry = qry.remoteValueFilter(
-//            new P1<Person>() {
-//                @Override public boolean apply(Person e) {
-//                    return !"Joe Black".equals(e.name);
-//                }
-//            });
-//
-//        qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new 
AverageLocalReducer());
-//
-//        Integer avg = qry.reduce().get();
-//
-//        assertNotNull("Average", avg);
-//        assertEquals("Average", 25, avg.intValue());
-//    }
-
-    /**
-     * @return true if cache mode is replicated, false otherwise.
-     */
-    private boolean isReplicatedMode() {
-        return cacheMode() == REPLICATED;
-    }
-
-    /**
-     * Person.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    private static class Person implements Serializable {
-        /** Name. */
-        @QuerySqlField(index = false)
-        private final String name;
-
-        /** Age. */
-        @QuerySqlField(index = true)
-        private final int age;
-
-        /** Organization ID. */
-        @QuerySqlField(index = true)
-        private final int orgId;
-
-        /**
-         * @param name Name.
-         * @param age Age.
-         * @param orgId Organization ID.
-         */
-        private Person(String name, int age, int orgId) {
-            assert !F.isEmpty(name);
-            assert age > 0;
-            assert orgId > 0;
-
-            this.name = name;
-            this.age = age;
-            this.orgId = orgId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            Person person = (Person)o;
-
-            return age == person.age && orgId == person.orgId && 
name.equals(person.name);
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = name.hashCode();
-
-            res = 31 * res + age;
-            res = 31 * res + orgId;
-
-            return res;
-        }
-    }
-
-    /**
-     * Organization.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    private static class Organization implements Serializable {
-        /** ID. */
-        @QuerySqlField
-        private final int id;
-
-        /** Name. */
-        @QuerySqlField(index = false)
-        private final String name;
-
-        /**
-         * @param id ID.
-         * @param name Name.
-         */
-        private Organization(int id, String name) {
-            assert id > 0;
-            assert !F.isEmpty(name);
-
-            this.id = id;
-            this.name = name;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            Organization that = (Organization)o;
-
-            return id == that.id && name.equals(that.name);
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = id;
-
-            res = 31 * res + name.hashCode();
-
-            return res;
-        }
-    }
-
-    /**
-     * Average remote reducer factory.
-     */
-    protected static class AverageRemoteReducer implements 
IgniteReducer<List<?>, IgniteBiTuple<Integer, Integer>> {
-        /** */
-        private int sum;
-
-        /** */
-        private int cnt;
-
-        /** {@inheritDoc} */
-        @Override public boolean collect(List<?> e) {
-            sum += (Integer)e.get(0);
-
-            cnt++;
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteBiTuple<Integer, Integer> reduce() {
-            return F.t(sum, cnt);
-        }
-    }
-
-    /**
-     * Average local reducer factory.
-     */
-    protected static class AverageLocalReducer implements 
IgniteReducer<IgniteBiTuple<Integer, Integer>, Integer> {
-        /** */
-        private int sum;
-
-        /** */
-        private int cnt;
-
-        /** {@inheritDoc} */
-        @Override public boolean collect(IgniteBiTuple<Integer, Integer> t) {
-            sum += t.get1();
-            cnt += t.get2();
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer reduce() {
-            return cnt == 0 ? 0 : sum / cnt;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
deleted file mode 100644
index 6aa467b..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.reducefields;
-
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-
-/**
- * Reduce fields queries tests for partitioned cache.
- */
-public class GridCacheReduceFieldsQueryAtomicSelfTest extends 
GridCacheReduceFieldsQueryPartitionedSelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return ATOMIC;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected NearCacheConfiguration nearConfiguration() {
-        return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java
deleted file mode 100644
index 17f024e..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.reducefields;
-
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheMode.LOCAL;
-
-/**
- * Reduce fields queries tests for local cache.
- */
-public class GridCacheReduceFieldsQueryLocalSelfTest extends 
GridCacheAbstractReduceFieldsQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return LOCAL;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 1;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
deleted file mode 100644
index f5b16f5..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.reducefields;
-
-import java.util.List;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.lang.IgniteBiTuple;
-
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-
-/**
- * Reduce fields queries tests for partitioned cache.
- */
-public class GridCacheReduceFieldsQueryPartitionedSelfTest extends 
GridCacheAbstractReduceFieldsQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return PARTITIONED;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testIncludeBackups() throws Exception {
-        CacheQuery<List<?>> qry = 
((IgniteKernal)grid(0)).getCache(null).context().queries().
-            createSqlFieldsQuery("select age from Person", false);
-
-        qry.includeBackups(true);
-
-        int sum = 0;
-
-        for (IgniteBiTuple<Integer, Integer> tuple : qry.execute(new 
AverageRemoteReducer()).get())
-            sum += tuple.get1();
-
-        // One backup, so sum is two times greater
-        assertEquals("Sum", 200, sum);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java
deleted file mode 100644
index 3e08d6d..0000000
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.reducefields;
-
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-
-/**
- * Reduce fields queries tests for replicated cache.
- */
-public class GridCacheReduceFieldsQueryReplicatedSelfTest extends 
GridCacheAbstractReduceFieldsQuerySelfTest {
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return REPLICATED;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java
index 5c9acb5..88f1f1e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query;
 
 import java.util.List;
 import java.util.concurrent.Callable;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -102,7 +103,7 @@ public class IgniteSqlSchemaIndexingTest extends 
GridCommonAbstractTest {
 
                 return null;
             }
-        }, IgniteException.class, "Schema for cache already registered");
+        }, IgniteException.class, "Cache already registered: ");
     }
 
     /**
@@ -185,7 +186,7 @@ public class IgniteSqlSchemaIndexingTest extends 
GridCommonAbstractTest {
                 cache.query(qryWrong);
                 return null;
             }
-        }, IgniteException.class, "Failed to parse query");
+        }, CacheException.class, "Failed to parse query");
 
         SqlFieldsQuery qryCorrect = new SqlFieldsQuery("select f.\"id\", 
f.\"name\" " +
             "from \""+schemaName+"\".\"Fact\" f");

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index fd52469..8ab70ba 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -24,10 +24,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -38,7 +43,9 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.springframework.util.StringUtils;
 
 /**
  * Tests for correct distributed partitioned queries.
@@ -52,6 +59,10 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        CacheKeyConfiguration keyCfg = new 
CacheKeyConfiguration(TestKey.class.getName(), "affKey");
+
+        cfg.setCacheKeyConfiguration(keyCfg);
+
         cfg.setPeerClassLoadingEnabled(false);
 
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -97,6 +108,8 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
             Integer.class, Integer.class));
 
         try {
+            awaitPartitionMapExchange();
+
             List<Integer> res = new ArrayList<>();
 
             Random rnd = new GridRandom();
@@ -136,6 +149,8 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
             Integer.class, GroupIndexTestValue.class));
 
         try {
+            awaitPartitionMapExchange();
+
             // Check group index usage.
             String qry = "select 1 from GroupIndexTestValue ";
 
@@ -207,10 +222,659 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoins() throws Exception {
+        CacheConfiguration ccfg = cacheConfig("persOrg", true,
+            Integer.class, Person2.class, Integer.class, Organization.class);
+
+        IgniteCache<Integer, Object> c = ignite(0).getOrCreateCache(ccfg);
+
+        try {
+            awaitPartitionMapExchange();
+
+            doTestDistributedJoins(c, 30, 100, 1000, false);
+            doTestDistributedJoins(c, 30, 100, 1000, true);
+
+            doTestDistributedJoins(c, 3, 10, 3, false);
+            doTestDistributedJoins(c, 3, 10, 3, true);
+
+            doTestDistributedJoins(c, 300, 2000, 5, false);
+            doTestDistributedJoins(c, 300, 2000, 5, true);
+        }
+        finally {
+            c.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinsUnion() throws Exception {
+        CacheConfiguration ccfg = cacheConfig("persOrg", true,
+            Integer.class, Person2.class, Integer.class, Organization.class);
+
+        IgniteCache<Integer, Object> c = ignite(0).getOrCreateCache(ccfg);
+
+        try {
+            c.put(1, new Organization("o1"));
+            c.put(2, new Organization("o2"));
+            c.put(3, new Person2(1, "p1"));
+            c.put(4, new Person2(2, "p2"));
+            c.put(5, new Person2(3, "p3"));
+
+            String select = "select o.name n1, p.name n2 from Person2 p, 
Organization o where p.orgId = o._key and o._key=1" +
+                " union select o.name n1, p.name n2 from Person2 p, 
Organization o where p.orgId = o._key and o._key=2";
+
+            String plan = (String)c.query(new SqlFieldsQuery("explain " + 
select)
+                .setDistributedJoins(true).setEnforceJoinOrder(true))
+                .getAll().get(0).get(0);
+
+            X.println("Plan : " + plan);
+
+            assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, StringUtils.countOccurrencesOf(plan, 
"batched:unicast"));
+
+            assertEquals(2, c.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
+                .setEnforceJoinOrder(false)).getAll().size());
+
+            select = "select * from (" + select + ")";
+
+            plan = (String)c.query(new SqlFieldsQuery("explain " + select)
+                .setDistributedJoins(true).setEnforceJoinOrder(true))
+                .getAll().get(0).get(0);
+
+            X.println("Plan : " + plan);
+
+            assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, StringUtils.countOccurrencesOf(plan, 
"batched:unicast"));
+
+            assertEquals(2, c.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
+                .setEnforceJoinOrder(false)).getAll().size());
+        }
+        finally {
+            c.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinsUnionPartitionedReplicated() throws 
Exception {
+        CacheConfiguration ccfg1 = cacheConfig("pers", true,
+            Integer.class, Person2.class);
+        CacheConfiguration ccfg2 = cacheConfig("org", false,
+            Integer.class, Organization.class);
+
+        IgniteCache<Integer, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+        IgniteCache<Integer, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+        try {
+            c2.put(1, new Organization("o1"));
+            c2.put(2, new Organization("o2"));
+            c1.put(3, new Person2(1, "p1"));
+            c1.put(4, new Person2(2, "p2"));
+            c1.put(5, new Person2(3, "p3"));
+
+            String select0 = "select o.name n1, p.name n2 from 
\"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" 
+
+                " union select o.name n1, p.name n2 from \"org\".Organization 
o, \"pers\".Person2 p where p.orgId = o._key and o._key=2";
+
+            String plan = (String)c1.query(new SqlFieldsQuery("explain " + 
select0)
+                .setDistributedJoins(true))
+                .getAll().get(0).get(0);
+
+            X.println("Plan: " + plan);
+
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select0).setDistributedJoins(true)).getAll().size());
+
+            String select = "select * from (" + select0 + ")";
+
+            plan = (String)c1.query(new SqlFieldsQuery("explain " + select)
+                .setDistributedJoins(true))
+                .getAll().get(0).get(0);
+
+            X.println("Plan : " + plan);
+
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
+
+            String select1 = "select o.name n1, p.name n2 from 
\"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" 
+
+                " union select * from (select o.name n1, p.name n2 from 
\"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and 
o._key=2)";
+
+            plan = (String)c1.query(new SqlFieldsQuery("explain " + select1)
+                .setDistributedJoins(true)).getAll().get(0).get(0);
+
+            X.println("Plan: " + plan);
+
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
+
+            select = "select * from (" + select1 + ")";
+
+            plan = (String)c1.query(new SqlFieldsQuery("explain " + select)
+                .setDistributedJoins(true)).getAll().get(0).get(0);
+
+            X.println("Plan : " + plan);
+
+            assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
+            assertEquals(2, c1.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinsPlan() throws Exception {
+        List<IgniteCache<Object, Object>> caches = new ArrayList<>();
+
+        IgniteCache<Object, Object> persPart =
+            ignite(0).createCache(cacheConfig("persPart", true, Integer.class, 
Person2.class));
+        caches.add(persPart);
+
+        IgniteCache<Object, Object> persPartAff =
+            ignite(0).createCache(cacheConfig("persPartAff", true, 
TestKey.class, Person2.class));
+        caches.add(persPartAff);
+
+        IgniteCache<Object, Object> orgPart =
+            ignite(0).createCache(cacheConfig("orgPart", true, Integer.class, 
Organization.class));
+        caches.add(orgPart);
+
+        IgniteCache<Object, Object> orgPartAff =
+            ignite(0).createCache(cacheConfig("orgPartAff", true, 
TestKey.class, Organization.class));
+        caches.add(orgPartAff);
+
+        IgniteCache<Object, Object> orgRepl =
+            ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, 
Organization.class));
+        caches.add(orgRepl);
+
+        IgniteCache<Object, Object> orgRepl2 =
+            ignite(0).createCache(cacheConfig("orgRepl2", false, 
Integer.class, Organization.class));
+        caches.add(orgRepl2);
+
+        try {
+            // Join two partitioned.
+
+            checkQueryPlan(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgPart\".Organization o " 
+
+                    "where p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlan(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgPartAff\".Organization 
o " +
+                    "where p.orgId = o.affKey",
+                "batched:unicast");
+
+            checkQueryPlan(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgPart\".Organization o " 
+
+                    "where p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlan(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p inner join 
\"orgPart\".Organization o " +
+                    "on p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlan(persPart,
+                false,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p left outer join 
\"orgPart\".Organization o " +
+                    "on p.orgId = o._key",
+                "batched:unicast");
+
+            checkQueryPlan(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgPart\".Organization o, \"persPart\".Person2 p " 
+
+                    "where p.orgId = o._key",
+                "batched:broadcast");
+
+            checkQueryPlan(persPart,
+                true,
+                1,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgPartAff\".Organization o, \"persPart\".Person2 
p " +
+                    "where p.orgId = o.affKey",
+                "batched:broadcast");
+
+            // Join partitioned and replicated.
+
+            checkQueryPlan(persPart,
+                true,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgRepl\".Organization o " 
+
+                    "where p.orgId = o._key");
+
+            checkQueryPlan(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, \"orgRepl\".Organization o " 
+
+                    "where p.orgId = o._key");
+
+            checkQueryPlan(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p, (select * from 
\"orgRepl\".Organization) o " +
+                    "where p.orgId = o._key");
+
+            checkQueryPlan(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from (select * from \"orgRepl\".Organization) o, 
\"persPart\".Person2 p " +
+                    "where p.orgId = o._key");
+
+            checkQueryPlan(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p inner join 
\"orgRepl\".Organization o " +
+                    "on p.orgId = o._key");
+
+            checkQueryPlan(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"persPart\".Person2 p left outer join 
\"orgRepl\".Organization o " +
+                    "on p.orgId = o._key");
+
+            checkQueryPlan(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgRepl\".Organization o, \"persPart\".Person2 p " 
+
+                    "where p.orgId = o._key");
+
+            checkQueryPlan(persPart,
+                false,
+                0,
+                "select p._key k1, o._key k2 " +
+                    "from \"orgRepl\".Organization o inner join 
\"persPart\".Person2 p " +
+                    "on p.orgId = o._key");
+
+//            checkQueryPlan(persPart,
+//                true,
+//                1,
+//                "select p._key k1, o._key k2 " +
+//                    "from \"orgRepl\".Organization o left outer join 
\"persPart\".Person2 p " +
+//                    "on p.orgId = o._key",
+//                "batched:broadcast");
+
+            // Join on affinity keys.
+
+            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+                "\"persPart\".Person2 p",
+                "\"orgPart\".Organization o",
+                "where p._key = o._key", true);
+
+            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+                "\"persPart\".Person2 p",
+                "\"orgRepl\".Organization o",
+                "where p._key = o._key", true);
+
+            checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ",
+                "\"persPartAff\".Person2 p",
+                "\"orgPart\".Organization o",
+                "where p.affKey = o._key", true);
+
+            checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ",
+                "\"persPartAff\".Person2 p",
+                "\"orgRepl\".Organization o",
+                "where p.affKey = o._key", true);
+
+            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+                "(select * from \"persPart\".Person2) p",
+                "\"orgPart\".Organization o",
+                "where p._key = o._key", false);
+
+            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+                "\"persPart\".Person2 p",
+                "(select * from \"orgPart\".Organization) o",
+                "where p._key = o._key", false);
+
+            // Join multiple.
+
+            {
+                String sql = "select * from " +
+                    "(select o1._key k1, o2._key k2 from 
\"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 where o1._key > 
o2._key) o, " +
+                    "\"persPart\".Person2 p where p.orgId = o.k1";
+
+                checkQueryPlan(persPart,
+                    false,
+                    0,
+                    sql);
+
+                checkQueryPlan(persPart,
+                    true,
+                    0,
+                    sql);
+
+                sql = "select o.k1, p1._key k2, p2._key k3 from " +
+                    "(select o1._key k1, o2._key k2 from 
\"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 where o1._key > 
o2._key) o, " +
+                    "\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 where 
p1._key=p2._key and p2.orgId = o.k1";
+
+                checkQueryPlan(persPart,
+                    false,
+                    1,
+                    sql,
+                    "persPartAff", "persPart", "batched:unicast", "orgRepl");
+
+                checkQueryFails(persPart, sql, true);
+
+                sql = "select o.k1, p._key k2 from " +
+                    "(select o1._key k1, p1._key k2 from 
\"orgRepl\".Organization o1, \"persPart\".Person2 p1 where o1._key = p1.orgId) 
o, " +
+                    "\"persPartAff\".Person2 p where p._key=o.k1";
+
+                checkQueryPlan(persPart,
+                    false,
+                    1,
+                    sql,
+                    "FROM \"persPart\"", "INNER JOIN \"orgRepl\"", "INNER JOIN 
\"persPartAff\"", "batched:broadcast");
+
+                checkQueryFails(persPart, sql, true);
+            }
+
+            {
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgPart\".Organization o " +
+                    "where p1.affKey=p2._key and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    true,
+                    2,
+                    sql,
+                    "batched:unicast", "batched:unicast");
+
+                checkQueryPlan(persPart,
+                    false,
+                    2,
+                    sql,
+                    "batched:unicast", "batched:unicast");
+            }
+
+            {
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgPart\".Organization o " +
+                    "where p1.affKey > p2._key and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    true,
+                    2,
+                    sql,
+                    "batched:broadcast", "batched:unicast");
+
+                checkQueryPlan(persPart,
+                    false,
+                    2,
+                    sql,
+                    "batched:broadcast", "batched:unicast");
+            }
+
+            {
+                // First join is collocated, second is replicated.
+
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgRepl\".Organization o " +
+                    "where p1.affKey=p2._key and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    true,
+                    0,
+                    sql);
+
+                checkQueryPlan(persPart,
+                    false,
+                    0,
+                    sql);
+            }
+
+            {
+                String sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, 
\"orgRepl\".Organization o " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"orgRepl\".Organization o, \"persPartAff\".Person2 
p1, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization 
o, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from (select * from \"orgRepl\".Organization) o, 
\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2._key and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    false,
+                    1,
+                    sql,
+                    "batched:unicast");
+            }
+        }
+        finally {
+            for (IgniteCache<Object, Object> cache : caches)
+                ignite(0).destroyCache(cache.getName());
+        }
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinsEnforceReplicatedNotLast() throws 
Exception {
+        List<IgniteCache<Object, Object>> caches = new ArrayList<>();
+
+        IgniteCache<Object, Object> persPart =
+            ignite(0).createCache(cacheConfig("persPart", true, Integer.class, 
Person2.class));
+        caches.add(persPart);
+
+        IgniteCache<Object, Object> persPartAff =
+            ignite(0).createCache(cacheConfig("persPartAff", true, 
TestKey.class, Person2.class));
+        caches.add(persPartAff);
+
+        IgniteCache<Object, Object> orgRepl =
+            ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, 
Organization.class));
+        caches.add(orgRepl);
+
+        try {
+            checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key 
k3 " +
+                "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, 
\"persPart\".Person2 p2 " +
+                "where p1._key=p2._key and p2.orgId = o._key", true);
+
+            checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key 
k3 " +
+                "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization o, 
\"persPart\".Person2 p2 " +
+                "where p1._key=p2._key and p2.orgId = o._key", true);
+
+            checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key 
k3 " +
+                "from \"persPartAff\".Person2 p1, (select * from 
\"orgRepl\".Organization) o, \"persPart\".Person2 p2 " +
+                "where p1._key=p2._key and p2.orgId = o._key", true);
+
+            checkQueryPlan(persPart,
+                true,
+                0,
+                "select p._key k1, o._key k2 from \"orgRepl\".Organization o, 
\"persPart\".Person2 p");
+
+            checkQueryPlan(persPart,
+                true,
+                0,
+                "select p._key k1, o._key k2 from \"orgRepl\".Organization o, 
\"persPart\".Person2 p union " +
+                    "select p._key k1, o._key k2 from \"persPart\".Person2 p, 
\"orgRepl\".Organization o");
+        }
+        finally {
+            for (IgniteCache<Object, Object> cache : caches)
+                ignite(0).destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param sql SQL.
+     * @param enforceJoinOrder Enforce join order flag.
+     */
+    private void checkQueryFails(final IgniteCache<Object, Object> cache,
+        String sql,
+        boolean enforceJoinOrder) {
+        final SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                cache.query(qry);
+
+                return null;
+            }
+        }, CacheException.class, null);
+    }
+
+    /**
+     * @param cache Query cache.
+     * @param select Select clause.
+     * @param cache1 Cache name1.
+     * @param cache2 Cache name2.
+     * @param where Where clause.
+     * @param testEnforceJoinOrder If {@code true} tests query with enforced 
join order.
+     */
+    private void checkNoBatchedJoin(IgniteCache<Object, Object> cache,
+        String select,
+        String cache1,
+        String cache2,
+        String where,
+        boolean testEnforceJoinOrder) {
+        checkQueryPlan(cache,
+            false,
+            0,
+            select +
+                "from " + cache1 + ","  + cache2 + " "+ where);
+
+        checkQueryPlan(cache,
+            false,
+            0,
+            select +
+                "from " + cache2 + ","  + cache1 + " "+ where);
+
+        if (testEnforceJoinOrder) {
+            checkQueryPlan(cache,
+                true,
+                0,
+                select +
+                    "from " + cache1 + ","  + cache2 + " "+ where);
+
+            checkQueryPlan(cache,
+                true,
+                0,
+                select +
+                    "from " + cache2 + ","  + cache1 + " "+ where);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expBatchedJoins Expected batched joins count.
+     * @param sql Query.
+     * @param expText Expected text to find in plan.
+     */
+    private void checkQueryPlan(IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expBatchedJoins,
+        String sql,
+        String...expText) {
+        checkQueryPlan(cache,
+            enforceJoinOrder,
+            expBatchedJoins,
+            new SqlFieldsQuery(sql),
+            expText);
+
+        checkQueryPlan(cache,
+            enforceJoinOrder,
+            expBatchedJoins,
+            new SqlFieldsQuery("select * from (" + sql + ")"),
+            expText);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expBatchedJoins Expected batched joins count.
+     * @param qry Query.
+     * @param expText Expected text to find in plan.
+     */
+    private void checkQueryPlan(IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expBatchedJoins,
+        SqlFieldsQuery qry,
+        String...expText) {
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+        qry.setDistributedJoins(true);
+
+        String plan = queryPlan(cache, qry);
+
+        log.info("Plan: " + plan);
+
+        assertEquals("Unexpected number of batched joins in plan [plan=" + 
plan + ", qry=" + qry + ']',
+            expBatchedJoins,
+            StringUtils.countOccurrencesOf(plan, "batched"));
+
+        int startIdx = 0;
+
+        for (String exp : expText) {
+            int idx = plan.indexOf(exp, startIdx);
+
+            if (idx == -1) {
+                fail("Plan does not contain expected string [startIdx=" + 
startIdx +
+                    ", plan=" + plan +
+                    ", exp=" + exp + ']');
+            }
+
+            startIdx = idx + 1;
+        }
+    }
+
+    /**
      * Test HAVING clause.
      */
     public void testHaving() {
-        IgniteCache<Integer, Integer> c = 
ignite(0).getOrCreateCache(cacheConfig("ints", true,
+        IgniteCache<Integer, Integer> c = 
ignite(0).getOrCreateCache(cacheConfig("having", true,
             Integer.class, Integer.class));
 
         try {
@@ -256,6 +920,61 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
 
     /**
      * @param c Cache.
+     * @param orgs Number of organizations.
+     * @param persons Number of persons.
+     * @param pageSize Page size.
+     * @param enforceJoinOrder Enforce join order.
+     */
+    private void doTestDistributedJoins(IgniteCache<Integer, Object> c, int 
orgs, int persons, int pageSize,
+        boolean enforceJoinOrder) {
+        assertEquals(0, c.size(CachePeekMode.ALL));
+
+        int key = 0;
+
+        for (int i = 0; i < orgs; i++) {
+            Organization o = new Organization();
+
+            o.name = "Org" + i;
+
+            c.put(key++, o);
+        }
+
+        Random rnd = new GridRandom();
+
+        for (int i = 0; i < persons; i++) {
+            Person2 p = new Person2();
+
+            p.name = "Person" + i;
+            p.orgId = rnd.nextInt(orgs);
+
+            c.put(key++, p);
+        }
+
+        String select = "select count(*) from Organization o, Person2 p where 
p.orgId = o._key";
+
+        String plan = (String)c.query(new SqlFieldsQuery("explain " + select)
+            
.setDistributedJoins(true).setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize))
+            .getAll().get(0).get(0);
+
+        X.println("Plan : " + plan);
+
+        if (enforceJoinOrder)
+            assertTrue(plan, plan.contains("batched:broadcast"));
+        else
+            assertTrue(plan, plan.contains("batched:unicast"));
+
+        assertEquals(Long.valueOf(persons), c.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
+            
.setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize)).getAll().get(0).get(0));
+
+        c.clear();
+
+        assertEquals(0, c.size(CachePeekMode.ALL));
+        assertEquals(0L, c.query(new 
SqlFieldsQuery(select).setDistributedJoins(true)
+            
.setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize)).getAll().get(0).get(0));
+    }
+
+    /**
+     * @param c Cache.
      * @param qry Query.
      * @param args Arguments.
      * @return Column as list.
@@ -282,8 +1001,6 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
      *
      */
     public void testFunctionNpe() {
-        assert false : "https://issues.apache.org/jira/browse/IGNITE-1886";;
-
         IgniteCache<Integer, User> userCache = ignite(0).createCache(
             cacheConfig("UserCache", true, Integer.class, User.class));
         IgniteCache<Integer, UserOrder> userOrderCache = ignite(0).createCache(
@@ -391,9 +1108,11 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
      * Test value.
      */
     private static class GroupIndexTestValue implements Serializable {
+        /** */
         @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", 
order = 0))
         private int a;
 
+        /** */
         @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", 
order = 1))
         private int b;
 
@@ -407,23 +1126,131 @@ public class IgniteSqlSplitterSelfTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /**
+     *
+     */
+    private static class Person2 implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int orgId;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         *
+         */
+        public Person2() {
+            // No-op.
+        }
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person2(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestKey implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        @AffinityKeyMapped
+        int affKey;
+
+        /** */
+        @QuerySqlField()
+        int id;
+
+        /**
+         * @param affKey Affinity key.
+         * @param id ID.
+         */
+        public TestKey(int affKey, int id) {
+            this.affKey = affKey;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey personKey = (TestKey)o;
+
+            return id == personKey.id;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         *
+         */
+        public Organization() {
+            // No-op.
+        }
+
+        /**
+         * @param name Organization name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
     private static class User implements Serializable {
+        /** */
         @QuerySqlField
         private int id;
     }
 
+    /**
+     *
+     */
     private static class UserOrder implements Serializable {
+        /** */
         @QuerySqlField
         private int id;
 
+        /** */
         @QuerySqlField
         private int userId;
     }
 
+    /**
+     *
+     */
     private static class OrderGood implements Serializable {
+        /** */
         @QuerySqlField
         private int orderId;
 
+        /** */
         @QuerySqlField
         private int goodId;
     }

Reply via email to