This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b03a970  IGNITE-10307: SQL: Partition pruning for joins. This closes 
#5774.
b03a970 is described below

commit b03a9705b144196bfde631328562008eb9fa48a6
Author: devozerov <voze...@gridgain.com>
AuthorDate: Thu Jan 17 18:47:59 2019 +0300

    IGNITE-10307: SQL: Partition pruning for joins. This closes #5774.
---
 .../processors/query/h2/IgniteH2Indexing.java      |  110 +-
 ...ode.java => PartitionAffinityFunctionType.java} |   35 +-
 .../query/h2/affinity/PartitionAllNode.java        |    5 +
 .../query/h2/affinity/PartitionCompositeNode.java  |  114 +-
 .../query/h2/affinity/PartitionConstantNode.java   |    6 +-
 .../query/h2/affinity/PartitionExtractor.java      |  401 ++++--
 .../query/h2/affinity/PartitionGroupNode.java      |   23 +-
 .../query/h2/affinity/PartitionJoinCondition.java  |  132 ++
 .../query/h2/affinity/PartitionJoinGroup.java      |   81 ++
 .../query/h2/affinity/PartitionNode.java           |    5 +
 .../query/h2/affinity/PartitionNoneNode.java       |    5 +
 .../query/h2/affinity/PartitionParameterNode.java  |    2 +-
 .../query/h2/affinity/PartitionResult.java         |   25 +-
 .../query/h2/affinity/PartitionSingleNode.java     |   21 +-
 .../query/h2/affinity/PartitionTable.java          |  113 ++
 .../affinity/PartitionTableAffinityDescriptor.java |   97 ++
 .../h2/affinity/PartitionTableDescriptor.java      |   73 --
 .../query/h2/affinity/PartitionTableModel.java     |  157 +++
 .../processors/query/h2/opt/GridH2Table.java       |  161 ++-
 .../query/h2/sql/GridSqlQuerySplitter.java         |   20 +-
 .../query/h2/twostep/GridReduceQueryExecutor.java  |    5 +-
 .../BetweenOperationExtractPartitionSelfTest.java  |   18 -
 .../h2/twostep/JoinPartitionPruningSelfTest.java   | 1303 ++++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite.java           |    2 +
 24 files changed, 2591 insertions(+), 323 deletions(-)

diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 0c1b1d6..5e00aea 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -92,7 +92,8 @@ import 
org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
 import org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.processors.query.h2.affinity.PartitionNode;
+import 
org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
+import org.apache.ignite.internal.processors.query.h2.affinity.PartitionResult;
 import 
org.apache.ignite.internal.processors.query.h2.database.H2TreeClientIndex;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import 
org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO;
@@ -139,6 +140,7 @@ import 
org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
@@ -248,6 +250,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private DdlStatementsProcessor ddlProc;
 
+    /** Partition extractor. */
+    private PartitionExtractor partExtractor;
+
     /** */
     private final RunningQueryManager runningQueryMgr = new 
RunningQueryManager();
 
@@ -2006,46 +2011,27 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         boolean cursorCreated = false;
 
         try {
-            // TODO: Use intersection 
(https://issues.apache.org/jira/browse/IGNITE-10567)
-            int partitions[] = qry.getPartitions();
-
-            if (partitions == null && twoStepQry.derivedPartitions() != null) {
-                try {
-                    PartitionNode partTree = 
twoStepQry.derivedPartitions().tree();
-
-                    Collection<Integer> partitions0 = 
partTree.apply(qry.getArgs());
-
-                    if (F.isEmpty(partitions0))
-                        partitions = new int[0];
-                    else {
-                        partitions = new int[partitions0.size()];
-
-                        int i = 0;
-
-                        for (Integer part : partitions0)
-                            partitions[i++] = part;
-                    }
-
-                    if (partitions.length == 0) { //here we know that result 
of requested query is empty
-                        return new QueryCursorImpl<List<?>>(new 
Iterable<List<?>>() {
-                            @Override public Iterator<List<?>> iterator() {
-                                return new Iterator<List<?>>() {
-                                    @Override public boolean hasNext() {
-                                        return false;
-                                    }
+            // When explicit partitions are set, there must be an owning cache 
they should be applied to.
+            int explicitParts[] = qry.getPartitions();
+            PartitionResult derivedParts = twoStepQry.derivedPartitions();
+
+            int parts[] = calculatePartitions(explicitParts, derivedParts, 
qry.getArgs());
+
+            if (parts != null && parts.length == 0) {
+                return new QueryCursorImpl<>(new Iterable<List<?>>() {
+                    @Override public Iterator<List<?>> iterator() {
+                        return new Iterator<List<?>>() {
+                            @Override public boolean hasNext() {
+                                return false;
+                            }
 
-                                    @Override public List<?> next() {
-                                        return null;
-                                    }
-                                };
+                            
@SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
+                            @Override public List<?> next() {
+                                return null;
                             }
-                        });
+                        };
                     }
-                }
-                catch (IgniteCheckedException e) {
-                    throw new CacheException("Failed to calculate derived 
partitions: [qry=" + qry.getSql() +
-                        ", params=" + Arrays.deepToString(qry.getArgs()) + 
"]", e);
-                }
+                });
             }
 
             Iterable<List<?>> iter = runQueryTwoStep(
@@ -2057,7 +2043,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 qry.getTimeout(),
                 cancel,
                 qry.getArgs(),
-                partitions,
+                parts,
                 qry.isLazy(),
                 mvccTracker
             );
@@ -2079,6 +2065,43 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * Calculate partitions for the query.
+     *
+     * @param explicitParts Explicit partitions provided in 
SqlFieldsQuery.partitions property.
+     * @param derivedParts Derived partitions found during partition pruning.
+     * @param args Arguments.
+     * @return Calculated partitions or {@code null} if failed to calculate 
and there should be a broadcast.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private int[] calculatePartitions(int[] explicitParts, PartitionResult 
derivedParts, Object[] args) {
+        if (!F.isEmpty(explicitParts))
+            return explicitParts;
+        else if (derivedParts != null) {
+            try {
+                Collection<Integer> realParts = 
derivedParts.tree().apply(args);
+
+                if (F.isEmpty(realParts))
+                    return IgniteUtils.EMPTY_INTS;
+                else {
+                    int[] realParts0 = new int[realParts.size()];
+
+                    int i = 0;
+
+                    for (Integer realPart : realParts)
+                        realParts0[i++] = realPart;
+
+                    return realParts0;
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException("Failed to calculate derived 
partitions for query.", e);
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * Do initial parsing of the statement and create query caches, if needed.
      * @param c Connection.
      * @param sqlQry Query.
@@ -2375,6 +2398,8 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         dmlProc = new DmlStatementsProcessor(ctx, this);
         ddlProc = new DdlStatementsProcessor(ctx, schemaMgr);
 
+        partExtractor = new PartitionExtractor(this);
+
         if (JdbcUtils.serializer != null)
             U.warn(log, "Custom H2 serialization is already configured, will 
override.");
 
@@ -2670,6 +2695,13 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * @return Partition extractor.
+     */
+    public PartitionExtractor partitionExtractor() {
+        return partExtractor;
+    }
+
+    /**
      * Collect cache identifiers from two-step query.
      *
      * @param mainCacheId Id of main cache.
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java
similarity index 62%
copy from 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
copy to 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java
index 238739c..4c88fcb 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAffinityFunctionType.java
@@ -17,29 +17,32 @@
 
 package org.apache.ignite.internal.processors.query.h2.affinity;
 
-import org.apache.ignite.IgniteCheckedException;
-
-import java.util.Collection;
-
 /**
- * Common node of partition tree.
+ * Affinity function type.
  */
-public interface PartitionNode {
+public enum PartitionAffinityFunctionType {
+    /** Custom affintiy function. */
+    CUSTOM(0),
+
+    /** Rendezvous affinity function. */
+    RENDEZVOUS(1);
+
+    /** Value. */
+    private final int val;
+
     /**
-     * Get partitions.
+     * Constructor.
      *
-     * @param args Query arguments.
-     * @return Partitions.
-     * @throws IgniteCheckedException If failed.
+     * @param val Value.
      */
-    Collection<Integer> apply(Object... args) throws IgniteCheckedException;
+    PartitionAffinityFunctionType(int val) {
+        this.val = val;
+    }
 
     /**
-     * Try optimizing partition nodes into a simpler form.
-     *
-     * @return Optimized node or {@code this} if optimization failed.
+     * @return Value.
      */
-    default PartitionNode optimize() {
-        return this;
+    public int value() {
+        return val;
     }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java
index 842d82c..30860f5 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionAllNode.java
@@ -41,6 +41,11 @@ public class PartitionAllNode implements PartitionNode {
     }
 
     /** {@inheritDoc} */
+    @Override public int joinGroup() {
+        return PartitionTableModel.GRP_NONE;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(PartitionAllNode.class, this);
     }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java
index 2cb330f..45ceaaf 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionCompositeNode.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.h2.affinity;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 import java.util.Collection;
@@ -65,6 +66,8 @@ public class PartitionCompositeNode implements PartitionNode {
                 return null;
 
             // (A, B) and (B, C) -> (B)
+            leftParts = new HashSet<>(leftParts);
+
             leftParts.retainAll(rightParts);
         }
         else {
@@ -77,6 +80,8 @@ public class PartitionCompositeNode implements PartitionNode {
                 return leftParts;
 
             // (A, B) or (B, C) -> (A, B, C)
+            leftParts = new HashSet<>(leftParts);
+
             leftParts.addAll(rightParts);
         }
 
@@ -84,6 +89,12 @@ public class PartitionCompositeNode implements PartitionNode 
{
     }
 
     /** {@inheritDoc} */
+    @Override public int joinGroup() {
+        // Similar to group node, we cannot cache join group value here as it 
may be changed dynamically.
+        return left.joinGroup();
+    }
+
+    /** {@inheritDoc} */
     @Override public PartitionNode optimize() {
         PartitionNode left = this.left;
         PartitionNode right = this.right;
@@ -103,9 +114,15 @@ public class PartitionCompositeNode implements 
PartitionNode {
             return optimizeSpecial(right, left);
 
         // If one of child nodes cannot be optimized, nothing can be done 
further.
-        // Note that we cannot return "this" here because left or right parts 
might have been optimized.
-        if (left instanceof PartitionCompositeNode || right instanceof 
PartitionCompositeNode)
+        // Note that we cannot return "this" here because left or right parts 
might have been changed.
+        if (left instanceof PartitionCompositeNode || right instanceof 
PartitionCompositeNode) {
+            // Should be "NONE" for AND in fact, but this would violate 
current non-collocated join semantics as
+            // explained in "optimizeSimpleAnd" method below.
+            if (left.joinGroup() != right.joinGroup())
+                return PartitionAllNode.INSTANCE;
+
             return new PartitionCompositeNode(left, right, op);
+        }
 
         // Try optimizing composite nodes.
         if (left instanceof PartitionGroupNode)
@@ -182,6 +199,11 @@ public class PartitionCompositeNode implements 
PartitionNode {
     private PartitionNode optimizeGroupAnd(PartitionGroupNode left, 
PartitionNode right) {
         assert op == PartitionCompositeNodeOperator.AND;
 
+        // Should be "NONE" for AND in fact, but this would violate current 
non-collocated join semantics as
+        // explained in "optimizeSimpleAnd" method below.
+        if (left.joinGroup() != right.joinGroup())
+            return PartitionAllNode.INSTANCE;
+
         // Optimistic check whether both sides are equal.
         if (right instanceof PartitionGroupNode) {
             PartitionGroupNode right0 = (PartitionGroupNode)right;
@@ -206,22 +228,50 @@ public class PartitionCompositeNode implements 
PartitionNode {
             }
 
             if (rightConsts != null) {
-                // {A, B) and (B, C) -> (B).
-                consts.retainAll(rightConsts);
-
-                if (consts.isEmpty())
-                    // {A, B) and (C, D) -> NONE.
-                    return PartitionNoneNode.INSTANCE;
-                else if (consts.size() == 1)
+                // Try to merge nodes if they belong to the same table.
+                boolean sameTbl = true;
+                String curTblAlias = null;
+
+                for (PartitionSingleNode curConst : consts) {
+                    if (curTblAlias == null)
+                        curTblAlias = curConst.table().alias();
+                    else if (!F.eq(curTblAlias, curConst.table().alias())) {
+                        sameTbl = false;
+
+                        break;
+                    }
+                }
+
+                if (sameTbl) {
+                    for (PartitionSingleNode curConst : rightConsts) {
+                        if (curTblAlias == null)
+                            curTblAlias = curConst.table().alias();
+                        else if (!F.eq(curTblAlias, curConst.table().alias())) 
{
+                            sameTbl = false;
+
+                            break;
+                        }
+                    }
+                }
+
+                if (sameTbl) {
                     // {A, B) and (B, C) -> (B).
-                    return consts.iterator().next();
-                else
-                    // {A, B, C) and (B, C, D) -> (B, C).
-                    return new PartitionGroupNode(consts);
+                    consts.retainAll(rightConsts);
+
+                    if (consts.isEmpty())
+                        // {A, B) and (C, D) -> NONE.
+                        return PartitionNoneNode.INSTANCE;
+                    else if (consts.size() == 1)
+                        // {A, B) and (B, C) -> (B).
+                        return consts.iterator().next();
+                    else
+                        // {A, B, C) and (B, C, D) -> (B, C).
+                        return new PartitionGroupNode(consts);
+                }
             }
         }
 
-        // Otherwise it is a mixed set of concrete partitions and arguments. 
Cancel optimization.
+        // Otherwise it is a mixed set of concrete partitions and arguments 
possibly from different caches.
         // Note that in fact we can optimize expression to certain extent 
(e.g. (A) and (B, :C) -> (A) and (:C)),
         // but resulting expression is always composite node still, which 
cannot be optimized on upper levels.
         // So we skip any fine-grained optimization in favor of simplicity.
@@ -238,6 +288,10 @@ public class PartitionCompositeNode implements 
PartitionNode {
     private PartitionNode optimizeGroupOr(PartitionGroupNode left, 
PartitionNode right) {
         assert op == PartitionCompositeNodeOperator.OR;
 
+        // Cannot merge disjunctive nodes if they belong to different join 
groups.
+        if (left.joinGroup() != right.joinGroup())
+            return PartitionAllNode.INSTANCE;
+
         HashSet<PartitionSingleNode> siblings = new HashSet<>(left.siblings());
 
         if (right instanceof PartitionSingleNode)
@@ -278,18 +332,28 @@ public class PartitionCompositeNode implements 
PartitionNode {
     private PartitionNode optimizeSimpleAnd(PartitionSingleNode left, 
PartitionSingleNode right) {
         assert op == PartitionCompositeNodeOperator.AND;
 
+        // Currently we do not merge such nodes because it may violate 
existing broken (!!!) join semantics.
+        // Normally, if we have two non-collocated partition sets, then this 
should be an empty set for collocated
+        // query mode. Unfortunately, current semantics of collocated query 
mode assume that even though both sides
+        // of expression are located on random nodes, there is a slight chance 
that they may accidentally reside on
+        // a single node and hence return some rows. We return "ALL" here to 
keep this broken semantics consistent
+        // irrespective of whether partition pruning is used or not. Once 
non-collocated joins are fixed, this
+        // condition will be changed to "NONE".
+        if (left.joinGroup() != right.joinGroup())
+            return PartitionAllNode.INSTANCE;
+
         // Check if both sides are equal.
         if (left.equals(right))
             // (X) and (X) -> X
             // (:X) and (:X) -> :X
             return left;
 
-        // If both sides are constants, and they are not equal, this is empty 
set.
-        if (left.constant() && right.constant())
+        // If both sides are constants from the same table and they are not 
equal, this is empty set.
+        if (left.constant() && right.constant() && F.eq(left.table().alias(), 
right.tbl.alias()))
             // X and Y -> NONE
             return PartitionNoneNode.INSTANCE;
 
-        // Otherwise it is a mixed set, cannot reduce.
+        // Otherwise this is a mixed set, cannot reduce.
         // X and :Y -> (X) AND (:Y)
         return new PartitionCompositeNode(left, right, 
PartitionCompositeNodeOperator.AND);
     }
@@ -304,7 +368,21 @@ public class PartitionCompositeNode implements 
PartitionNode {
     private PartitionNode optimizeSimpleOr(PartitionSingleNode left, 
PartitionSingleNode right) {
         assert op == PartitionCompositeNodeOperator.OR;
 
-        return left.equals(right) ? left : PartitionGroupNode.merge(left, 
right);
+        // Cannot merge disjunctive nodes if they belong to different join 
groups.
+        if (left.joinGroup() != right.joinGroup())
+            return PartitionAllNode.INSTANCE;
+
+        // (A) or (A) -> (A)
+        if (left.equals(right))
+            return left;
+
+        // (A) or (B) -> (A, B)
+        HashSet<PartitionSingleNode> nodes = new HashSet<>();
+
+        nodes.add(left);
+        nodes.add(right);
+
+        return new PartitionGroupNode(nodes);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java
index 9efafe4..9e258ae 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionConstantNode.java
@@ -29,11 +29,11 @@ public class PartitionConstantNode extends 
PartitionSingleNode {
     /**
      * Constructor.
      *
-     * @param resolver Resolver.
+     * @param tbl Table.
      * @param part Partition.
      */
-    public PartitionConstantNode(PartitionTableDescriptor resolver, int part) {
-        super(resolver);
+    public PartitionConstantNode(PartitionTable tbl, int part) {
+        super(tbl);
 
         this.part = part;
     }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java
index 12549ce..0898e63 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionExtractor.java
@@ -21,8 +21,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
@@ -30,6 +34,7 @@ import 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlJoin;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperation;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter;
@@ -41,6 +46,9 @@ import org.h2.table.Column;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collections;
+
 /**
  * Partition tree extractor.
  */
@@ -84,14 +92,11 @@ public class PartitionExtractor {
 
         GridSqlSelect select = (GridSqlSelect)qry;
 
-        // Currently we can extract data only from a single table.
-        GridSqlTable tbl = unwrapTable(select.from());
-
-        if (tbl == null)
-            return null;
+        // Prepare table model.
+        PartitionTableModel tblModel = prepareTableModel(select.from());
 
         // Do extract.
-        PartitionNode tree = extractFromExpression(select.where());
+        PartitionNode tree = extractFromExpression(select.where(), tblModel, 
false);
 
         assert tree != null;
 
@@ -101,10 +106,8 @@ public class PartitionExtractor {
         if (tree instanceof PartitionAllNode)
             return null;
 
-        // Return.
-        PartitionTableDescriptor desc = descriptor(tbl.dataTable());
-
-        return new PartitionResult(desc, tree);
+        // Done.
+        return new PartitionResult(tree, 
tblModel.joinGroupAffinity(tree.joinGroup()));
     }
 
     /**
@@ -114,21 +117,25 @@ public class PartitionExtractor {
      * @return Partition result or {@code null} if nothing is resolved.
      */
     @SuppressWarnings("IfMayBeConditional")
-    public PartitionResult merge(List<GridCacheSqlQuery> qrys) {
+    public PartitionResult mergeMapQueries(List<GridCacheSqlQuery> qrys) {
         // Check if merge is possible.
-        PartitionTableDescriptor desc = null;
+        PartitionTableAffinityDescriptor aff = null;
 
         for (GridCacheSqlQuery qry : qrys) {
             PartitionResult qryRes = (PartitionResult)qry.derivedPartitions();
 
+            // Failed to get results for one query -> broadcast.
             if (qryRes == null)
-                // Failed to get results for one query -> broadcast.
                 return null;
 
-            if (desc == null)
-                desc = qryRes.descriptor();
-            else if (!F.eq(desc, qryRes.descriptor()))
-                // Queries refer to different tables, cannot merge -> 
broadcast.
+            // This only possible if query is resolved to "NONE". Will be 
skipped later during map request prepare.
+            if (qryRes.affinity() == null)
+                continue;
+
+            if (aff == null)
+                aff = qryRes.affinity();
+            else if (!aff.isCompatible(qryRes.affinity()))
+                // Queries refer to incompatible affinity groups, cannot merge 
-> broadcast.
                 return null;
         }
 
@@ -152,33 +159,232 @@ public class PartitionExtractor {
         if (tree instanceof PartitionAllNode)
             return null;
 
-        return new PartitionResult(desc, tree);
+        // If there is no affinity, then we assume "NONE" result.
+        assert aff != null || tree == PartitionNoneNode.INSTANCE;
+
+        return new PartitionResult(tree, aff);
     }
 
     /**
-     * Try unwrapping the table.
+     * Prepare table model.
      *
-     * @param from From.
-     * @return Table or {@code null} if not a table.
+     * @param from FROM clause.
+     * @return Join model.
      */
-    @Nullable private static GridSqlTable unwrapTable(GridSqlAst from) {
-        if (from instanceof GridSqlAlias)
-            from = from.child();
+    private PartitionTableModel prepareTableModel(GridSqlAst from) {
+        PartitionTableModel res = new PartitionTableModel();
+
+        prepareTableModel0(from, res);
 
-        if (from instanceof GridSqlTable)
-            return (GridSqlTable)from;
+        return res;
+    }
+
+    /**
+     * Prepare tables which will be used in join model.
+     *
+     * @param from From flag.
+     * @param model Table model.
+     * @return {@code True} if extracted tables successfully, {@code false} if 
failed to extract.
+     */
+    private List<PartitionTable> prepareTableModel0(GridSqlAst from, 
PartitionTableModel model) {
+        if (from instanceof GridSqlJoin) {
+            // Process JOIN recursively.
+            GridSqlJoin join = (GridSqlJoin)from;
+
+            List<PartitionTable> leftTbls = 
prepareTableModel0(join.leftTable(), model);
+            List<PartitionTable> rightTbls = 
prepareTableModel0(join.rightTable(), model);
+
+            if (join.isLeftOuter()) {
+                // "a LEFT JOIN b" is transformed into "a", and "b" is put 
into special stop-list.
+                // If a condition is met on "b" afterwards, we will ignore it.
+                for (PartitionTable rightTbl : rightTbls)
+                    model.addExcludedTable(rightTbl.alias());
+
+                return leftTbls;
+            }
+
+            // Extract equi-join or cross-join from condition. For normal 
INNER JOINs most likely we will have "1=1"
+            // cross join here, real join condition will be found in WHERE 
clause later.
+            PartitionJoinCondition cond = parseJoinCondition(join.on());
+
+            if (cond != null && !cond.cross())
+                model.addJoin(cond);
+
+            ArrayList<PartitionTable> res = new ArrayList<>(leftTbls.size() + 
rightTbls.size());
+
+            res.addAll(leftTbls);
+            res.addAll(rightTbls);
+
+            return res;
+        }
+
+        PartitionTable tbl = prepareTable(from, model);
+
+        return tbl != null ? Collections.singletonList(tbl) : 
Collections.emptyList();
+    }
+
+    /**
+     * Try parsing condition as simple JOIN codition. Only equijoins are 
supported for now, so anything more complex
+     * than "A.a = B.b" are not processed.
+     *
+     * @param on Initial AST.
+     * @return Join condition or {@code null} if not simple equijoin.
+     */
+    private static PartitionJoinCondition parseJoinCondition(GridSqlElement 
on) {
+        if (on instanceof GridSqlOperation) {
+            GridSqlOperation on0 = (GridSqlOperation)on;
+
+            if (on0.operationType() == GridSqlOperationType.EQUAL) {
+                // Check for cross-join first.
+                GridSqlConst leftConst = unwrapConst(on0.child(0));
+                GridSqlConst rightConst = unwrapConst(on0.child(1));
+
+                if (leftConst != null && rightConst != null) {
+                    try {
+                        int leftConstval = leftConst.value().getInt();
+                        int rightConstVal = rightConst.value().getInt();
+
+                        if (leftConstval == rightConstVal)
+                            return PartitionJoinCondition.CROSS;
+                    }
+                    catch (Exception ignore) {
+                        // No-op.
+                    }
+                }
+
+                // This is not cross-join, neither normal join between columns.
+                if (leftConst != null || rightConst != null)
+                    return null;
+
+                // Check for normal equi-join.
+                GridSqlColumn left = unwrapColumn(on0.child(0));
+                GridSqlColumn right = unwrapColumn(on0.child(1));
+
+                if (left != null && right != null) {
+                    String leftAlias = left.tableAlias();
+                    String rightAlias = right.tableAlias();
+
+                    String leftCol = left.columnName();
+                    String rightCol = right.columnName();
+
+                    return new PartitionJoinCondition(leftAlias, rightAlias, 
leftCol, rightCol);
+                }
+            }
+        }
 
         return null;
     }
 
     /**
+     * Prepare single table.
+     *
+     * @param from Expression.
+     * @param tblModel Table model.
+     * @return Added table or {@code null} if table is exlcuded from the model.
+     */
+    private static PartitionTable prepareTable(GridSqlAst from, 
PartitionTableModel tblModel) {
+        // Unwrap alias. We assume that every table must be aliased.
+        assert from instanceof GridSqlAlias;
+
+        String alias = ((GridSqlAlias)from).alias();
+
+        from = from.child();
+
+        if (from instanceof GridSqlTable) {
+            // Normal table.
+            GridSqlTable from0 = (GridSqlTable)from;
+
+            GridH2Table tbl0 = from0.dataTable();
+
+            // Unknown table type, e.g. temp table.
+            if (tbl0 == null) {
+                tblModel.addExcludedTable(alias);
+
+                return null;
+            }
+
+            String cacheName = tbl0.cacheName();
+
+            String affColName = null;
+            String secondAffColName = null;
+
+            for (Column col : tbl0.getColumns()) {
+                if (tbl0.isColumnForPartitionPruningStrict(col)) {
+                    if (affColName == null)
+                        affColName = col.getName();
+                    else {
+                        secondAffColName = col.getName();
+
+                        // Break as we cannot have more than two affinity key 
columns.
+                        break;
+                    }
+                }
+            }
+
+            PartitionTable tbl = new PartitionTable(alias, cacheName, 
affColName, secondAffColName);
+
+            PartitionTableAffinityDescriptor aff = 
affinityForCache(tbl0.cacheInfo().config());
+
+            if (aff == null) {
+                // Non-standard affinity, exclude table.
+                tblModel.addExcludedTable(alias);
+
+                return null;
+            }
+
+            tblModel.addTable(tbl, aff);
+
+            return tbl;
+        }
+        else {
+            // Subquery/union/view, etc.
+            assert alias != null;
+
+            tblModel.addExcludedTable(alias);
+
+            return null;
+        }
+    }
+
+    /**
+     * Prepare affinity identifier for cache.
+     *
+     * @param ccfg Cache configuration.
+     * @return Affinity identifier.
+     */
+    private static PartitionTableAffinityDescriptor 
affinityForCache(CacheConfiguration ccfg) {
+        // Partition could be extracted only from PARTITIONED caches.
+        if (ccfg.getCacheMode() != CacheMode.PARTITIONED)
+            return null;
+
+        PartitionAffinityFunctionType aff = 
ccfg.getAffinity().getClass().equals(RendezvousAffinityFunction.class) ?
+            PartitionAffinityFunctionType.RENDEZVOUS : 
PartitionAffinityFunctionType.CUSTOM;
+
+        boolean hasNodeFilter = ccfg.getNodeFilter() != null &&
+            !(ccfg.getNodeFilter() instanceof 
CacheConfiguration.IgniteAllNodesPredicate);
+
+        return new PartitionTableAffinityDescriptor(
+            aff,
+            ccfg.getAffinity().partitions(),
+            hasNodeFilter,
+            ccfg.getDataRegionName()
+        );
+    }
+
+    /**
      * Extract partitions from expression.
      *
      * @param expr Expression.
+     * @param tblModel Table model.
+     * @param disjunct Whether current processing frame is located under 
disjunction ("OR"). In this case we cannot
+     *                 rely on join expressions like (A.a = B.b) to build 
co-location model because another conflicting
+     *                 join expression on the same tables migth be located on 
the other side of the "OR".
+     *                 Example: "JOIN ON A.a = B.b OR A.a > B.b".
      * @return Partition tree.
      */
     @SuppressWarnings("EnumSwitchStatementWhichMissesCases")
-    private PartitionNode extractFromExpression(GridSqlAst expr) throws 
IgniteCheckedException {
+    private PartitionNode extractFromExpression(GridSqlAst expr, 
PartitionTableModel tblModel, boolean disjunct)
+        throws IgniteCheckedException {
         PartitionNode res = PartitionAllNode.INSTANCE;
 
         if (expr instanceof GridSqlOperation) {
@@ -186,22 +392,22 @@ public class PartitionExtractor {
 
             switch (op.operationType()) {
                 case AND:
-                    res = extractFromAnd(op);
+                    res = extractFromAnd(op, tblModel, disjunct);
 
                     break;
 
                 case OR:
-                    res = extractFromOr(op);
+                    res = extractFromOr(op, tblModel);
 
                     break;
 
                 case IN:
-                    res = extractFromIn(op);
+                    res = extractFromIn(op, tblModel);
 
                     break;
 
                 case EQUAL:
-                    res = extractFromEqual(op);
+                    res = extractFromEqual(op, tblModel, disjunct);
             }
         }
 
@@ -213,18 +419,21 @@ public class PartitionExtractor {
      * Extract partition information from AND.
      *
      * @param op Operation.
+     * @param tblModel Table model.
+     * @param disjunct Disjunction marker.
      * @return Partition.
      */
-    private PartitionNode extractFromAnd(GridSqlOperation op) throws 
IgniteCheckedException {
+    private PartitionNode extractFromAnd(GridSqlOperation op, 
PartitionTableModel tblModel, boolean disjunct)
+        throws IgniteCheckedException {
         assert op.size() == 2;
 
-        PartitionNode betweenNodes = tryExtractBetween(op);
+        PartitionNode betweenNodes = tryExtractBetween(op, tblModel);
 
         if (betweenNodes != null)
             return betweenNodes;
 
-        PartitionNode part1 = extractFromExpression(op.child(0));
-        PartitionNode part2 = extractFromExpression(op.child(1));
+        PartitionNode part1 = extractFromExpression(op.child(0), tblModel, 
disjunct);
+        PartitionNode part2 = extractFromExpression(op.child(1), tblModel, 
disjunct);
 
         return new PartitionCompositeNode(part1, part2, 
PartitionCompositeNodeOperator.AND);
     }
@@ -233,13 +442,16 @@ public class PartitionExtractor {
      * Extract partition information from OR.
      *
      * @param op Operation.
+     * @param tblModel Table model.
      * @return Partition.
      */
-    private PartitionNode extractFromOr(GridSqlOperation op) throws 
IgniteCheckedException {
+    private PartitionNode extractFromOr(GridSqlOperation op, 
PartitionTableModel tblModel)
+        throws IgniteCheckedException {
         assert op.size() == 2;
 
-        PartitionNode part1 = extractFromExpression(op.child(0));
-        PartitionNode part2 = extractFromExpression(op.child(1));
+        // Parse inner expressions recursively with disjuncion flag set.
+        PartitionNode part1 = extractFromExpression(op.child(0), tblModel, 
true);
+        PartitionNode part2 = extractFromExpression(op.child(1), tblModel, 
true);
 
         return new PartitionCompositeNode(part1, part2, 
PartitionCompositeNodeOperator.OR);
     }
@@ -248,9 +460,11 @@ public class PartitionExtractor {
      * Extract partition information from IN.
      *
      * @param op Operation.
+     * @param tblModel Table model.
      * @return Partition.
      */
-    private PartitionNode extractFromIn(GridSqlOperation op) throws 
IgniteCheckedException {
+    private PartitionNode extractFromIn(GridSqlOperation op, 
PartitionTableModel tblModel)
+        throws IgniteCheckedException {
         // Operation should contain at least two children: left (column) and 
right (const or column).
         if (op.size() < 2)
             return PartitionAllNode.INSTANCE;
@@ -258,11 +472,9 @@ public class PartitionExtractor {
         // Left operand should be column.
         GridSqlAst left = op.child();
 
-        GridSqlColumn leftCol;
+        GridSqlColumn leftCol = unwrapColumn(left);
 
-        if (left instanceof GridSqlColumn)
-            leftCol = (GridSqlColumn)left;
-        else
+        if (leftCol == null)
             return PartitionAllNode.INSTANCE;
 
         // Can work only with Ignite tables.
@@ -291,8 +503,8 @@ public class PartitionExtractor {
                 // set globally. Hence, returning null.
                 return PartitionAllNode.INSTANCE;
 
-            // Do extract.
-            PartitionSingleNode part = extractSingle(leftCol.column(), 
rightConst, rightParam);
+            // Extract.
+            PartitionSingleNode part = extractSingle(leftCol, rightConst, 
rightParam, tblModel);
 
             // Same thing as above: single unknown partition in disjunction 
defeats optimization.
             if (part == null)
@@ -308,19 +520,20 @@ public class PartitionExtractor {
      * Extract partition information from equality.
      *
      * @param op Operation.
+     * @param tblModel Table model.
+     * @param disjunct Disjunction flag. When set possible join expression 
will not be processed.
      * @return Partition.
      */
-    private PartitionNode extractFromEqual(GridSqlOperation op) throws 
IgniteCheckedException {
+    private PartitionNode extractFromEqual(GridSqlOperation op, 
PartitionTableModel tblModel, boolean disjunct)
+        throws IgniteCheckedException {
         assert op.operationType() == GridSqlOperationType.EQUAL;
 
         GridSqlElement left = op.child(0);
         GridSqlElement right = op.child(1);
 
-        GridSqlColumn leftCol;
+        GridSqlColumn leftCol = unwrapColumn(left);
 
-        if (left instanceof GridSqlColumn)
-            leftCol = (GridSqlColumn)left;
-        else
+        if (leftCol == null)
             return PartitionAllNode.INSTANCE;
 
         if (!(leftCol.column().getTable() instanceof GridH2Table))
@@ -337,10 +550,20 @@ public class PartitionExtractor {
             rightConst = null;
             rightParam = (GridSqlParameter)right;
         }
-        else
+        else {
+            if (right instanceof GridSqlColumn) {
+                if (!disjunct) {
+                    PartitionJoinCondition cond = parseJoinCondition(op);
+
+                    if (cond != null && !cond.cross())
+                        tblModel.addJoin(cond);
+                }
+
+            }
             return PartitionAllNode.INSTANCE;
+        }
 
-        PartitionSingleNode part = extractSingle(leftCol.column(), rightConst, 
rightParam);
+        PartitionSingleNode part = extractSingle(leftCol, rightConst, 
rightParam, tblModel);
 
         return part != null ? part : PartitionAllNode.INSTANCE;
     }
@@ -351,52 +574,81 @@ public class PartitionExtractor {
      * @param leftCol Left column.
      * @param rightConst Right constant.
      * @param rightParam Right parameter.
+     * @param tblModel Table model.
      * @return Partition or {@code null} if failed to extract.
      */
-    @Nullable private PartitionSingleNode extractSingle(Column leftCol, 
GridSqlConst rightConst,
-        GridSqlParameter rightParam) throws IgniteCheckedException {
+    @Nullable private PartitionSingleNode extractSingle(
+        GridSqlColumn leftCol,
+        GridSqlConst rightConst,
+        GridSqlParameter rightParam,
+        PartitionTableModel tblModel
+    ) throws IgniteCheckedException {
         assert leftCol != null;
-        assert leftCol.getTable() != null;
-        assert leftCol.getTable() instanceof GridH2Table;
 
-        GridH2Table tbl = (GridH2Table)leftCol.getTable();
+        Column leftCol0 = leftCol.column();
 
-        if (!tbl.isColumnForPartitionPruning(leftCol))
+        assert leftCol0.getTable() != null;
+        assert leftCol0.getTable() instanceof GridH2Table;
+
+        GridH2Table tbl = (GridH2Table)leftCol0.getTable();
+
+        if (!tbl.isColumnForPartitionPruning(leftCol0))
             return null;
 
-        PartitionTableDescriptor tblDesc = descriptor(tbl);
+        PartitionTable tbl0 = tblModel.table(leftCol.tableAlias());
+
+        // If table is in ignored set, then we cannot use it for partition 
extraction.
+        if (tbl0 == null)
+            return null;
 
         if (rightConst != null) {
-            int part = 
idx.kernalContext().affinity().partition(tbl.cacheName(), 
rightConst.value().getObject());
+            Object constVal = H2Utils.convert(rightConst.value().getObject(), 
idx, leftCol0.getType());
+
+            int part = 
idx.kernalContext().affinity().partition(tbl.cacheName(), constVal);
 
-            return new PartitionConstantNode(tblDesc, part);
+            return new PartitionConstantNode(tbl0, part);
         }
         else if (rightParam != null)
-            return new PartitionParameterNode(tblDesc, idx, 
rightParam.index(), leftCol.getType());
+            return new PartitionParameterNode(tbl0, idx, rightParam.index(), 
leftCol0.getType());
         else
             return null;
     }
 
     /**
-     * Get descriptor from table.
+     * Unwrap constant if possible.
+     *
+     * @param ast AST.
+     * @return Constant or {@code null} if not a constant.
+     */
+    @Nullable public static GridSqlConst unwrapConst(GridSqlAst ast) {
+        return ast instanceof GridSqlConst ? (GridSqlConst)ast : null;
+    }
+
+    /**
+     * Unwrap column if possible.
      *
-     * @param tbl Table.
-     * @return Descriptor.
+     * @param ast AST.
+     * @return Column or {@code null} if not a column.
      */
-    private static PartitionTableDescriptor descriptor(GridH2Table tbl) {
-        return new PartitionTableDescriptor(tbl.cacheName(), tbl.getName());
+    @Nullable public static GridSqlColumn unwrapColumn(GridSqlAst ast) {
+        if (ast instanceof GridSqlAlias)
+            ast = ast.child();
+
+        return ast instanceof GridSqlColumn ? (GridSqlColumn)ast : null;
     }
 
     /**
      * Try to extract partitions from {@code op} assuming that it's between 
operation or simple range.
      *
      * @param op Sql operation.
+     * @param tblModel Table model.
      * @return {@code PartitionSingleNode} if operation reduced to one 
partition,
      *   {@code PartitionGroupNode} if operation reduced to multiple 
partitions or null if operation is neither
      *   between nor simple range. Null also returns if it's not possible to 
extract partitions from given operation.
      * @throws IgniteCheckedException If failed.
      */
-    private PartitionNode tryExtractBetween(GridSqlOperation op) throws 
IgniteCheckedException {
+    private PartitionNode tryExtractBetween(GridSqlOperation op, 
PartitionTableModel tblModel)
+        throws IgniteCheckedException {
         // Between operation (or similar range) should contain exact two 
children.
         assert op.size() == 2;
 
@@ -487,11 +739,18 @@ public class PartitionExtractor {
 
         Set<PartitionSingleNode> parts = new HashSet<>();
 
-        PartitionTableDescriptor desc = descriptor(tbl);
+        PartitionTable tbl0 = tblModel.table(leftCol.tableAlias());
+
+        // If table is in ignored set, then we cannot use it for partition 
extraction.
+        if (tbl0 == null)
+            return null;
 
         for (long i = leftLongVal; i <= rightLongVal; i++) {
-            parts.add(new PartitionConstantNode(desc,
-                idx.kernalContext().affinity().partition((tbl).cacheName(), 
i)));
+            Object constVal = H2Utils.convert(i, idx, 
leftCol.column().getType());
+
+            int part = 
idx.kernalContext().affinity().partition(tbl0.cacheName(), constVal);
+
+            parts.add(new PartitionConstantNode(tbl0, part));
 
             if (parts.size() > maxPartsCntBetween)
                 return null;
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java
index ef3a154..3d66439 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionGroupNode.java
@@ -35,22 +35,6 @@ public class PartitionGroupNode implements PartitionNode {
     private final Set<PartitionSingleNode> siblings;
 
     /**
-     * Merge two simple nodes.
-     *
-     * @param node1 Node 1.
-     * @param node2 Node 2.
-     * @return Group node.
-     */
-    public static PartitionGroupNode merge(PartitionSingleNode node1, 
PartitionSingleNode node2) {
-        HashSet<PartitionSingleNode> nodes = new HashSet<>();
-
-        nodes.add(node1);
-        nodes.add(node2);
-
-        return new PartitionGroupNode(nodes);
-    }
-
-    /**
      * Constructor.
      *
      * @param siblings Partitions.
@@ -72,6 +56,13 @@ public class PartitionGroupNode implements PartitionNode {
         return res;
     }
 
+    /** {@inheritDoc} */
+    @Override public int joinGroup() {
+        // Note that we cannot cache join group in constructor. We have strong 
invariant that all siblings always
+        // belongs to the same group. However, number of this group may be 
changed during expression tree traversing.
+        return siblings.iterator().next().joinGroup();
+    }
+
     /**
      * @return Siblings
      */
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java
new file mode 100644
index 0000000..244c301
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinCondition.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Join condition.
+ */
+public class PartitionJoinCondition {
+    /** Cross JOIN. */
+    public static final PartitionJoinCondition CROSS = new 
PartitionJoinCondition(null, null, null, null, true);
+
+    /** Left alias. */
+    private final String leftAlias;
+
+    /** Right alias. */
+    private final String rightAlias;
+
+    /** Left column name. */
+    private final String leftCol;
+
+    /** Right column name. */
+    private final String rightCol;
+
+    /** Whether this is a cross-join. */
+    private final boolean cross;
+
+    /**
+     * Constructor.
+     *
+     * @param leftAlias Left alias.
+     * @param rightAlias Right alias.
+     * @param leftCol Left column name.
+     * @param rightCol Right column name.
+     */
+    public PartitionJoinCondition(String leftAlias, String rightAlias, String 
leftCol, String rightCol) {
+        this(leftAlias, rightAlias, leftCol, rightCol, false);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param leftAlias Left alias.
+     * @param rightAlias Right alias.
+     * @param leftCol Left column name.
+     * @param rightCol Right column name.
+     * @param cross Whether this is a cross-join.
+     */
+    private PartitionJoinCondition(String leftAlias, String rightAlias, String 
leftCol, String rightCol,
+        boolean cross) {
+        this.leftAlias = leftAlias;
+        this.rightAlias = rightAlias;
+        this.leftCol = leftCol;
+        this.rightCol = rightCol;
+        this.cross = cross;
+    }
+
+    /**
+     * Left alias.
+     */
+    public String leftAlias() {
+        return leftAlias;
+    }
+
+    /**
+     * Right alias.
+     */
+    public String rightAlias() {
+        return rightAlias;
+    }
+
+    /**
+     * @return Left column.
+     */
+    public String leftColumn() {
+        return leftCol;
+    }
+
+    /**
+     * @return Right column.
+     */
+    public String rightColumn() {
+        return rightCol;
+    }
+
+    /**
+     * @return Wheter this is a cross-join.
+     */
+    public boolean cross() {
+        return cross;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = leftAlias.hashCode();
+
+        res = 31 * res + rightAlias.hashCode();
+        res = 31 * res + leftCol.hashCode();
+        res = 31 * res + rightCol.hashCode();
+        res = 31 * res + Boolean.hashCode(cross);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (obj instanceof PartitionJoinCondition) {
+            PartitionJoinCondition other = (PartitionJoinCondition)obj;
+
+            return F.eq(leftAlias, other.leftAlias) && F.eq(rightAlias, 
other.rightAlias) &&
+                F.eq(leftCol, other.leftCol) && F.eq(rightCol, other.rightCol) 
&& F.eq(cross, other.cross);
+        }
+
+        return false;
+    }
+}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java
new file mode 100644
index 0000000..641d013
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionJoinGroup.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+
+/**
+ * Group of joined tables whose affinity function could be "merged".
+ */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class PartitionJoinGroup {
+    /** Tables within a group. */
+    private final Collection<PartitionTable> tbls = 
Collections.newSetFromMap(new IdentityHashMap<>());
+
+    /** Affinity function descriptor. */
+    private final PartitionTableAffinityDescriptor affDesc;
+
+    /**
+     * Constructor.
+     *
+     * @param affDesc Affinity function descriptor.
+     */
+    public PartitionJoinGroup(PartitionTableAffinityDescriptor affDesc) {
+        this.affDesc = affDesc;
+    }
+
+    /**
+     * @return Tables in a group.
+     */
+    public Collection<PartitionTable> tables() {
+        return tbls;
+    }
+
+    /**
+     * Add table to the group.
+     *
+     * @param tbl Table.
+     * @return This for chaining.
+     */
+    public PartitionJoinGroup addTable(PartitionTable tbl) {
+        tbls.add(tbl);
+
+        return this;
+    }
+
+    /**
+     * Remove table from the group.
+     *
+     * @param tbl Table.
+     * @return If group is empty after removal.
+     */
+    public boolean removeTable(PartitionTable tbl) {
+        tbls.remove(tbl);
+
+        return tbls.isEmpty();
+    }
+
+    /**
+     * @return Affinity descriptor.
+     */
+    public PartitionTableAffinityDescriptor affinityDescriptor() {
+        return affDesc;
+    }
+}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
index 238739c..7372fc2 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNode.java
@@ -35,6 +35,11 @@ public interface PartitionNode {
     Collection<Integer> apply(Object... args) throws IgniteCheckedException;
 
     /**
+     * @return Join group for the given node.
+     */
+    int joinGroup();
+
+    /**
      * Try optimizing partition nodes into a simpler form.
      *
      * @return Optimized node or {@code this} if optimization failed.
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java
index b3a1358..5d4b324 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionNoneNode.java
@@ -42,6 +42,11 @@ public class PartitionNoneNode implements PartitionNode {
     }
 
     /** {@inheritDoc} */
+    @Override public int joinGroup() {
+        return PartitionTableModel.GRP_NONE;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(PartitionNoneNode.class, this);
     }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java
index 0624f2c..e9f4880 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionParameterNode.java
@@ -45,7 +45,7 @@ public class PartitionParameterNode extends 
PartitionSingleNode {
      * @param idx Parameter index.
      * @param dataType Parameter data type.
      */
-    public PartitionParameterNode(PartitionTableDescriptor tbl, 
IgniteH2Indexing indexing, int idx,
+    public PartitionParameterNode(PartitionTable tbl, IgniteH2Indexing 
indexing, int idx,
         int dataType) {
         super(tbl);
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java
index 13e7f87..daa14d3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionResult.java
@@ -24,37 +24,36 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  * Partition extraction result.
  */
 public class PartitionResult {
-    /** Descriptor. */
-    @GridToStringInclude
-    private final PartitionTableDescriptor desc;
-
     /** Tree. */
     @GridToStringInclude
     private final PartitionNode tree;
 
+    /** Affinity function. */
+    private final PartitionTableAffinityDescriptor aff;
+
     /**
      * Constructor.
      *
-     * @param desc Descriptor.
      * @param tree Tree.
+     * @param aff Affinity function.
      */
-    public PartitionResult(PartitionTableDescriptor desc, PartitionNode tree) {
-        this.desc = desc;
+    public PartitionResult(PartitionNode tree, 
PartitionTableAffinityDescriptor aff) {
         this.tree = tree;
+        this.aff = aff;
     }
 
     /**
-     * Descriptor.
+     * Tree.
      */
-    public PartitionTableDescriptor descriptor() {
-        return desc;
+    public PartitionNode tree() {
+        return tree;
     }
 
     /**
-     * Tree.
+     * @return Affinity function.
      */
-    public PartitionNode tree() {
-        return tree;
+    public PartitionTableAffinityDescriptor affinity() {
+        return aff;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java
index caf966c..35e7d30 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionSingleNode.java
@@ -30,14 +30,14 @@ import java.util.Collections;
 public abstract class PartitionSingleNode implements PartitionNode {
     /** Table descriptor. */
     @GridToStringExclude
-    protected final PartitionTableDescriptor tbl;
+    protected final PartitionTable tbl;
 
     /**
      * Constructor.
      *
      * @param tbl Table descriptor.
      */
-    protected PartitionSingleNode(PartitionTableDescriptor tbl) {
+    protected PartitionSingleNode(PartitionTable tbl) {
         this.tbl = tbl;
     }
 
@@ -59,17 +59,29 @@ public abstract class PartitionSingleNode implements 
PartitionNode {
      */
     public abstract boolean constant();
 
+    /** {@inheritDoc} */
+    @Override public int joinGroup() {
+        return tbl.joinGroup();
+    }
+
     /**
      * @return Partition for constant node, index for argument node.
      */
     public abstract int value();
 
+    /**
+     * @return Underlying table.
+     */
+    public PartitionTable table() {
+        return tbl;
+    }
+
     /** {@inheritDoc} */
     @Override public int hashCode() {
         int hash = (constant() ? 1 : 0);
 
         hash = 31 * hash + value();
-        hash = 31 * hash + tbl.hashCode();
+        hash = 31 * hash + tbl.alias().hashCode();
 
         return hash;
     }
@@ -84,6 +96,7 @@ public abstract class PartitionSingleNode implements 
PartitionNode {
 
         PartitionSingleNode other = (PartitionSingleNode)obj;
 
-        return F.eq(constant(), other.constant()) && F.eq(value(), 
other.value()) && F.eq(tbl, other.tbl);
+        return F.eq(constant(), other.constant()) && F.eq(value(), 
other.value()) &&
+            F.eq(tbl.alias(), other.tbl.alias());
     }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java
new file mode 100644
index 0000000..1b996c1
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Single table with affinity info.
+ */
+public class PartitionTable {
+    /** Alias used in the query. */
+    private final String alias;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Affinity column name (if can be resolved). */
+    private final String affColName;
+
+    /** Second affinity column name (possible when _KEY is affinity column and 
an alias for this column exists. */
+    private final String secondAffColName;
+
+    /** Join group index. */
+    private int joinGrp;
+
+    /**
+     * Constructor.
+     *
+     * @param alias Unique alias.
+     * @param cacheName Cache name.
+     * @param affColName Affinity column name.
+     * @param secondAffColName Second affinity column name.
+     */
+    public PartitionTable(
+        String alias,
+        String cacheName,
+        @Nullable String affColName,
+        @Nullable String secondAffColName
+    ) {
+        this.alias = alias;
+        this.cacheName = cacheName;
+
+        if (affColName == null && secondAffColName != null) {
+            this.affColName = secondAffColName;
+            this.secondAffColName = null;
+        }
+        else {
+            this.affColName = affColName;
+            this.secondAffColName = secondAffColName;
+        }
+    }
+
+    /**
+     * @return Alias.
+     */
+    public String alias() {
+        return alias;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * Check whether passed column is affinity column.
+     *
+     * @param colName Column name.
+     * @return {@code True} if affinity column.
+     */
+    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+    public boolean isAffinityColumn(String colName) {
+        return F.eq(colName, affColName) || F.eq(colName, secondAffColName);
+    }
+
+    /**
+     * @return Join group index.
+     */
+    public int joinGroup() {
+        return joinGrp;
+    }
+
+    /**
+     * @param joinGrp Join group index.
+     */
+    public void joinGroup(int joinGrp) {
+        this.joinGrp = joinGrp;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PartitionTable.class, this);
+    }
+}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java
new file mode 100644
index 0000000..21dab9c
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableAffinityDescriptor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+
+/**
+ * Affinity function descriptor. Used to compare affinity functions of two 
tables.
+ */
+public class PartitionTableAffinityDescriptor implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Affinity function type. */
+    private final PartitionAffinityFunctionType affFunc;
+
+    /** Number of partitions. */
+    private final int parts;
+
+    /** Whether node filter is set. */
+    private final boolean hasNodeFilter;
+
+    /** Data region name. */
+    private final String dataRegion;
+
+    /**
+     * Constructor.
+     *
+     * @param affFunc Affinity function type.
+     * @param parts Number of partitions.
+     * @param hasNodeFilter Whether node filter is set.
+     * @param dataRegion Data region.
+     */
+    public PartitionTableAffinityDescriptor(
+        PartitionAffinityFunctionType affFunc,
+        int parts,
+        boolean hasNodeFilter,
+        String dataRegion
+    ) {
+        this.affFunc = affFunc;
+        this.parts = parts;
+        this.hasNodeFilter = hasNodeFilter;
+        this.dataRegion = dataRegion;
+    }
+
+    /**
+     * Check is provided descriptor is compatible with this instance (i.e. can 
be used in the same co-location group).
+     *
+     * @param other Other descriptor.
+     * @return {@code True} if compatible.
+     */
+    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+    public boolean isCompatible(PartitionTableAffinityDescriptor other) {
+        if (other == null)
+            return false;
+
+        // Rendezvous affinity function is deterministic and doesn't depend on 
previous cluster view changes.
+        // In future other user affinity functions would be applicable as well 
if explicityl marked deterministic.
+        if (affFunc == PartitionAffinityFunctionType.RENDEZVOUS) {
+            // We cannot be sure that two caches are co-located if custom node 
filter is present.
+            // Nota that technically we may try to compare two filters. 
However, this adds unnecessary complexity
+            // and potential deserialization issues when SQL is called from 
client nodes or thin clients.
+            if (!hasNodeFilter) {
+                return
+                    other.affFunc == PartitionAffinityFunctionType.RENDEZVOUS 
&&
+                    !other.hasNodeFilter &&
+                    other.parts == parts &&
+                    F.eq(other.dataRegion, dataRegion);
+            }
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PartitionTableAffinityDescriptor.class, this);
+    }
+}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java
deleted file mode 100644
index b11e07e..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableDescriptor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.affinity;
-
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Partition resolver.
- */
-public class PartitionTableDescriptor {
-    /** Cache name. */
-    private final String cacheName;
-
-    /** Table name. */
-    private final String tblName;
-
-    /**
-     * Constructor.
-     *
-     * @param cacheName Cache name.
-     * @param tblName Table name.
-     */
-    public PartitionTableDescriptor(String cacheName, String tblName) {
-        this.cacheName = cacheName;
-        this.tblName = tblName;
-    }
-
-    /**
-     * @return Cache name.
-     */
-    public String cacheName() {
-        return cacheName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return 31 * cacheName.hashCode() + tblName.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (o == this)
-            return true;
-
-        if (o.getClass() != getClass())
-            return false;
-
-        PartitionTableDescriptor other = (PartitionTableDescriptor)o;
-
-        return F.eq(cacheName, other.cacheName) && F.eq(tblName, 
other.tblName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(PartitionTableDescriptor.class, this);
-    }
-}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java
new file mode 100644
index 0000000..6393941
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/affinity/PartitionTableModel.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.affinity;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Partition join model. Describes how tables are joined with each other.
+ */
+public class PartitionTableModel {
+    /** Join group which could not be applied (e.g. for "ALL" case). */
+    public static final int GRP_NONE = -1;
+
+    /** All tables observed during parsing excluding outer. */
+    private final Map<String, PartitionTable> tbls = new HashMap<>();
+
+    /** Join groups. */
+    private final Map<Integer, PartitionJoinGroup> grps = new HashMap<>();
+
+    /** Talbes which are excluded from partition pruning calculation. */
+    private Set<String> excludedTblNames;
+
+    /** Group index generator */
+    private int grpIdxGen;
+
+    /**
+     * Add table.
+     *
+     * @param tbl Table.
+     * @param aff Affinity descriptor.
+     */
+    public void addTable(PartitionTable tbl, PartitionTableAffinityDescriptor 
aff) {
+        int grpIdx = grpIdxGen++;
+
+        tbl.joinGroup(grpIdx);
+
+        tbls.put(tbl.alias(), tbl);
+        grps.put(grpIdx, new PartitionJoinGroup(aff).addTable(tbl));
+    }
+
+    /**
+     * Get table by alias.
+     *
+     * @param alias Alias.
+     * @return Table or {@code null} if it cannot be used for partition 
pruning.
+     */
+    @Nullable public PartitionTable table(String alias) {
+        PartitionTable res = tbls.get(alias);
+
+        assert res != null || (excludedTblNames != null && 
excludedTblNames.contains(alias));
+
+        return res;
+    }
+
+    /**
+     * Add excluded table
+     *
+     * @param alias Alias.
+     */
+    public void addExcludedTable(String alias) {
+        PartitionTable tbl = tbls.remove(alias);
+
+        if (tbl != null) {
+            PartitionJoinGroup grp = grps.get(tbl.joinGroup());
+
+            assert grp != null;
+
+            if (grp.removeTable(tbl))
+                grps.remove(tbl.joinGroup());
+        }
+
+        if (excludedTblNames == null)
+            excludedTblNames = new HashSet<>();
+
+        excludedTblNames.add(alias);
+    }
+
+    /**
+     * Add equi-join condition. Two joined tables may possibly be merged into 
a single group.
+     *
+     * @param cond Condition.
+     */
+    public void addJoin(PartitionJoinCondition cond) {
+        PartitionTable leftTbl = tbls.get(cond.leftAlias());
+        PartitionTable rightTbl = tbls.get(cond.rightAlias());
+
+        assert leftTbl != null || (excludedTblNames != null && 
excludedTblNames.contains(cond.leftAlias()));
+        assert rightTbl != null || (excludedTblNames != null && 
excludedTblNames.contains(cond.rightAlias()));
+
+        // At least one tables is excluded, return.
+        if (leftTbl == null || rightTbl == null)
+            return;
+
+        // At least one column in condition is not affinity column, return.
+        if (!leftTbl.isAffinityColumn(cond.leftColumn()) || 
!rightTbl.isAffinityColumn(cond.rightColumn()))
+            return;
+
+        // Remember join group of the right table as it will be changed below.
+        int rightGrpId = rightTbl.joinGroup();
+
+        PartitionJoinGroup leftGrp = grps.get(leftTbl.joinGroup());
+        PartitionJoinGroup rightGrp = grps.get(rightGrpId);
+
+        assert leftGrp != null;
+        assert rightGrp != null;
+
+        // Groups are not compatible, return.
+        if 
(!leftGrp.affinityDescriptor().isCompatible(rightGrp.affinityDescriptor()))
+            return;
+
+        // Safe to merge groups.
+        for (PartitionTable tbl : rightGrp.tables()) {
+            tbl.joinGroup(leftTbl.joinGroup());
+
+            leftGrp.addTable(tbl);
+        }
+
+        grps.remove(rightGrpId);
+    }
+
+    /**
+     * Get affinity descriptor for the group.
+     *
+     * @param grpId Group ID.
+     * @return Affinity descriptor or {@code null} if there is no affinity 
descriptor (e.g. for "NONE" result).
+     */
+    @Nullable public PartitionTableAffinityDescriptor joinGroupAffinity(int 
grpId) {
+        if (grpId == GRP_NONE)
+            return null;
+
+        PartitionJoinGroup grp = grps.get(grpId);
+
+        assert grp != null;
+
+        return grp.affinityDescriptor();
+    }
+}
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index b69a011..fff12e3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -62,7 +62,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
 
 /**
  * H2 Table implementation.
@@ -101,8 +100,8 @@ public class GridH2Table extends TableBase {
     /** */
     private final IndexColumn affKeyCol;
 
-    /** */
-    private final int affKeyColId;
+    /** Whether affinity key column is the whole cache key. */
+    private final boolean affKeyColIsKey;
 
     /** */
     private final LongAdder size = new LongAdder();
@@ -122,6 +121,9 @@ public class GridH2Table extends TableBase {
     /** Flag remove index or not when table will be destroyed. */
     private volatile boolean rmIndex;
 
+    /** Columns with thread-safe access. */
+    private volatile Column[] safeColumns;
+
     /**
      * Creates table.
      *
@@ -141,36 +143,8 @@ public class GridH2Table extends TableBase {
         this.desc = desc;
         this.cacheInfo = cacheInfo;
 
-        if (!desc.type().customAffinityKeyMapper()) {
-            String affKeyFieldName = desc.type().affinityKey();
-
-            if (affKeyFieldName != null) {
-                if (doesColumnExist(affKeyFieldName)) {
-                    int colId = getColumn(affKeyFieldName).getColumnId();
-
-                    if (desc.isKeyColumn(colId)) {
-                        affKeyCol = 
indexColumn(GridH2KeyValueRowOnheap.KEY_COL, SortOrder.ASCENDING);
-                        affKeyColId = GridH2KeyValueRowOnheap.KEY_COL;
-                    }
-                    else {
-                        affKeyCol = indexColumn(colId, SortOrder.ASCENDING);
-                        affKeyColId = colId;
-                    }
-                }
-                else {
-                    affKeyCol = null;
-                    affKeyColId = COL_NOT_EXISTS;
-                }
-            }
-            else {
-                affKeyCol = indexColumn(GridH2KeyValueRowOnheap.KEY_COL, 
SortOrder.ASCENDING);
-                affKeyColId = GridH2KeyValueRowOnheap.KEY_COL;
-            }
-        }
-        else {
-            affKeyCol = null;
-            affKeyColId = COL_NOT_EXISTS;
-        }
+        affKeyCol = calculateAffinityKeyColumn();
+        affKeyColIsKey = affKeyCol != null && 
desc.isKeyColumn(affKeyCol.column.getColumnId());
 
         this.rowFactory = rowFactory;
 
@@ -210,6 +184,36 @@ public class GridH2Table extends TableBase {
     }
 
     /**
+     * Calculate affinity key column which will be used for partition pruning 
and distributed joins.
+     *
+     * @return Affinity column or {@code null} if none can be used.
+     */
+    private IndexColumn calculateAffinityKeyColumn() {
+        // If custome affinity key mapper is set, we do not know how to 
convert _KEY to partition, return null.
+        if (desc.type().customAffinityKeyMapper())
+            return null;
+
+        String affKeyFieldName = desc.type().affinityKey();
+
+        // If explicit affinity key field is not set, then use _KEY.
+        if (affKeyFieldName == null)
+            return indexColumn(GridH2KeyValueRowOnheap.KEY_COL, 
SortOrder.ASCENDING);
+
+        // If explicit affinity key field is set, but is not found in the 
table, do not use anything.
+        if (!doesColumnExist(affKeyFieldName))
+            return null;
+
+        int colId = getColumn(affKeyFieldName).getColumnId();
+
+        // If affinity key column is either _KEY or it's alias 
(QueryEntity.keyFieldName), normalize it to _KEY.
+        if (desc.isKeyColumn(colId))
+            return indexColumn(GridH2KeyValueRowOnheap.KEY_COL, 
SortOrder.ASCENDING);
+
+        // Otherwise use column as is.
+        return indexColumn(colId, SortOrder.ASCENDING);
+    }
+
+    /**
      * @return {@code true} If this is a partitioned table.
      */
     public boolean isPartitioned() {
@@ -230,9 +234,65 @@ public class GridH2Table extends TableBase {
      * @return {@code True} if affinity key column.
      */
     public boolean isColumnForPartitionPruning(Column col) {
+        return isColumnForPartitionPruning0(col, false);
+    }
+
+    /**
+     * Check whether passed column could be used for partition transfer during 
partition pruning on joined tables and
+     * for external affinity calculation (e.g. on thin clients).
+     * <p>
+     * Note that it is different from {@link 
#isColumnForPartitionPruning(Column)} method in that not every column
+     * which qualifies for partition pruning can be used by thin clients or 
join partinion prunining logic.
+     * <p>
+     * Consider the following schema:
+     * <pre>
+     * CREATE TABLE dept (id PRIMARY KEY);
+     * CREATE TABLE emp (id, dept_id AFFINITY KEY, PRIMARY KEY(id, dept_id));
+     * </pre>
+     * For expression-based partition pruning on "emp" table on the <b>server 
side</b> we may use both "_KEY" and
+     * "dept_id" columns, as passing them through standard affinity workflow 
will yield the same result:
+     * dept_id -> part
+     * _KEY -> dept_id -> part
+     * <p>
+     * But we cannot use "_KEY" on thin client side, as it doesn't know how to 
extract affinity key field properly.
+     * Neither we can perform partition transfer in JOINs when "_KEY" is used.
+     * <p>
+     * This is OK as data is collocated, so we can merge partitions extracted 
from both tables:
+     * <pre>
+     * SELECT * FROM dept d INNER JOIN emp e ON d.id = e.dept_id WHERE 
e.dept_id=? AND d.id=?
+     * </pre>
+     * But this is not OK as joined data is not collocated, and tables form 
distinct collocation groups:
+     * <pre>
+     * SELECT * FROM dept d INNER JOIN emp e ON d.id = e._KEY WHERE 
e.dept_id=? AND d.id=?
+     * </pre>
+     * NB: The last query is not logically correct and will produce empty 
result. However, it is correct from SQL
+     * perspective, so we should make incorrect assumptions about partitions 
as it may make situation even worse.
+     *
+     * @param col Column.
+     * @return {@code True} if column could be used for partition extraction 
on both server and client sides and for
+     *     partition transfer in joins.
+     */
+    public boolean isColumnForPartitionPruningStrict(Column col) {
+        return isColumnForPartitionPruning0(col, true);
+    }
+
+    /**
+     * Internal logic to check whether column qualifies for partition 
extraction or not.
+     *
+     * @param col Column.
+     * @param strict Strict flag.
+     * @return {@code True} if column could be used for partition.
+     */
+    private boolean isColumnForPartitionPruning0(Column col, boolean strict) {
+        if (affKeyCol == null)
+            return false;
+
         int colId = col.getColumnId();
 
-        return colId == affKeyColId || desc.isKeyColumn(colId);
+        if (colId == affKeyCol.column.getColumnId())
+            return true;
+
+        return (affKeyColIsKey || !strict) && desc.isKeyColumn(colId);
     }
 
     /**
@@ -333,6 +393,7 @@ public class GridH2Table extends TableBase {
      *
      * @param exclusive Exclusive flag.
      */
+    @SuppressWarnings({"LockAcquiredButNotSafelyReleased", 
"CallToThreadYield"})
     private void lock(boolean exclusive) {
         Lock l = exclusive ? lock.writeLock() : lock.readLock();
 
@@ -980,12 +1041,14 @@ public class GridH2Table extends TableBase {
         lock(true);
 
         try {
-            int pos = columns.length;
+            Column[] safeColumns0 = safeColumns;
+
+            int pos = safeColumns0.length;
 
-            Column[] newCols = new Column[columns.length + cols.size()];
+            Column[] newCols = new Column[safeColumns0.length + cols.size()];
 
             // First, let's copy existing columns to new array
-            System.arraycopy(columns, 0, newCols, 0, columns.length);
+            System.arraycopy(safeColumns0, 0, newCols, 0, safeColumns0.length);
 
             // And now, let's add new columns
             for (QueryField col : cols) {
@@ -1026,13 +1089,16 @@ public class GridH2Table extends TableBase {
      * @param cols Columns.
      * @param ifExists If EXISTS flag.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void dropColumns(List<String> cols, boolean ifExists) {
         assert !ifExists || cols.size() == 1;
 
         lock(true);
 
         try {
-            int size = columns.length;
+            Column[] safeColumns0 = safeColumns;
+
+            int size = safeColumns0.length;
 
             for (String name : cols) {
                 if (!doesColumnExist(name)) {
@@ -1052,8 +1118,8 @@ public class GridH2Table extends TableBase {
 
             int dst = 0;
 
-            for (int i = 0; i < columns.length; i++) {
-                Column column = columns[i];
+            for (int i = 0; i < safeColumns0.length; i++) {
+                Column column = safeColumns0[i];
 
                 for (String name : cols) {
                     if (F.eq(name, column.getName())) {
@@ -1084,7 +1150,16 @@ public class GridH2Table extends TableBase {
     }
 
     /** {@inheritDoc} */
+    @Override protected void setColumns(Column[] columns) {
+        this.safeColumns = columns;
+
+        super.setColumns(columns);
+    }
+
+    /** {@inheritDoc} */
     @Override public Column[] getColumns() {
+        Column[] safeColumns0 = safeColumns;
+
         Boolean insertHack = INSERT_HACK.get();
 
         if (insertHack != null && insertHack) {
@@ -1093,15 +1168,15 @@ public class GridH2Table extends TableBase {
             StackTraceElement elem = elems[2];
 
             if (F.eq(elem.getClassName(), Insert.class.getName()) && 
F.eq(elem.getMethodName(), "prepare")) {
-                Column[] columns0 = new Column[columns.length - 3];
+                Column[] columns0 = new Column[safeColumns0.length - 3];
 
-                System.arraycopy(columns, 3, columns0, 0, columns0.length);
+                System.arraycopy(safeColumns0, 3, columns0, 0, 
columns0.length);
 
                 return columns0;
             }
         }
 
-        return columns;
+        return safeColumns0;
     }
 
     /**
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index fcb3424..818777c 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -129,6 +129,9 @@ public class GridSqlQuerySplitter {
     private boolean collocatedGrpBy;
 
     /** */
+    private boolean distributedJoins;
+
+    /** */
     private IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases = new 
IdentityHashMap<>();
 
     /** Partition extractor. */
@@ -137,13 +140,15 @@ public class GridSqlQuerySplitter {
     /**
      * @param params Query parameters.
      * @param collocatedGrpBy If it is a collocated GROUP BY query.
-     * @param idx Indexing.
+     * @param distributedJoins Distributed joins flag.
+     * @param extractor Partition extractor.
      */
-    public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, 
IgniteH2Indexing idx) {
+    public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, 
boolean distributedJoins,
+        PartitionExtractor extractor) {
         this.params = params;
         this.collocatedGrpBy = collocatedGrpBy;
-
-        extractor = new PartitionExtractor(idx);
+        this.distributedJoins = distributedJoins;
+        this.extractor = extractor;
     }
 
     /**
@@ -207,7 +212,8 @@ public class GridSqlQuerySplitter {
 
         qry.explain(false);
 
-        GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, 
collocatedGrpBy, h2);
+        GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, 
collocatedGrpBy, distributedJoins,
+            h2.partitionExtractor());
 
         // Normalization will generate unique aliases for all the table 
filters in FROM.
         // Also it will collect all tables and schemas from the query.
@@ -262,7 +268,7 @@ public class GridSqlQuerySplitter {
         twoStepQry.distributedJoins(distributedJoins);
 
         // all map queries must have non-empty derivedPartitions to use this 
feature.
-        
twoStepQry.derivedPartitions(splitter.extractor.merge(twoStepQry.mapQueries()));
+        
twoStepQry.derivedPartitions(splitter.extractor.mergeMapQueries(twoStepQry.mapQueries()));
 
         twoStepQry.forUpdate(forUpdate);
 
@@ -1549,7 +1555,7 @@ public class GridSqlQuerySplitter {
         map.partitioned(hasPartitionedTables(mapQry));
         map.hasSubQueries(hasSubQueries);
 
-        if (map.isPartitioned())
+        if (map.isPartitioned() && !distributedJoins)
             map.derivedPartitions(extractor.extract(mapQry));
 
         mapSqlQrys.add(map);
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 62953ec..15788f2 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -393,9 +393,12 @@ public class GridReduceQueryExecutor {
         int timeoutMillis,
         GridQueryCancel cancel,
         Object[] params,
-        final int[] parts,
+        int[] parts,
         boolean lazy,
         MvccQueryTracker mvccTracker) {
+        if (qry.isLocal() && parts != null)
+            parts = null;
+
         assert !qry.mvccEnabled() || mvccTracker != null;
 
         if (F.isEmpty(params))
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java
index fbdbfb0..f6e73bc 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/BetweenOperationExtractPartitionSelfTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -41,7 +40,6 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -481,22 +479,6 @@ public class BetweenOperationExtractPartitionSelfTest 
extends GridCommonAbstract
     }
 
     /**
-     * Check custom partitions limit exceeding.
-     */
-    @Test
-    public void testBetweenPartitionsCustomLimitExceeding() {
-        try (GridTestUtils.SystemProperty ignored = new GridTestUtils.
-            
SystemProperty(IgniteSystemProperties.IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN,
 "4")){
-
-            // Default limit (16) not exceeded.
-            testBetweenConstOperator(BETWEEN_QRY, 1, 4, 4);
-
-            // Default limit (16) exceeded.
-            testBetweenConstOperator(BETWEEN_QRY, 1, 5, 5, 
EMPTY_PARTITIONS_ARRAY);
-        }
-    }
-
-    /**
      * Check range expression with constant values.
      */
     @Test
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
new file mode 100644
index 0000000..1429f3f
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
@@ -0,0 +1,1303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * Tests for join partition pruning.
+ */
+@SuppressWarnings("deprecation")
+@RunWith(JUnit4.class)
+public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
+    /** Number of intercepted requests. */
+    private static final AtomicInteger INTERCEPTED_REQS = new AtomicInteger();
+
+    /** Parititions tracked during query execution. */
+    private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS = 
new ConcurrentSkipListSet<>();
+
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder().setShared(true);
+
+    /** Client node name. */
+    private static final String CLI_NAME = "cli";
+
+    /** Memory. */
+    private static final String REGION_MEM = "mem";
+
+    /** Disk. */
+    private static final String REGION_DISK = "disk";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        cleanPersistenceDir();
+
+        startGrid(getConfiguration("srv1"));
+        startGrid(getConfiguration("srv2"));
+        startGrid(getConfiguration("srv3"));
+
+        startGrid(getConfiguration(CLI_NAME).setClientMode(true));
+
+        client().cluster().active(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        clearIoState();
+
+        Ignite cli = client();
+
+        cli.destroyCaches(cli.cacheNames());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration res = super.getConfiguration(name);
+
+        res.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+        res.setCommunicationSpi(new TrackingTcpCommunicationSpi());
+
+        res.setLocalHost("127.0.0.1");
+
+        DataRegionConfiguration memRegion =
+            new 
DataRegionConfiguration().setName(REGION_MEM).setPersistenceEnabled(false);
+
+        DataRegionConfiguration diskRegion =
+            new 
DataRegionConfiguration().setName(REGION_DISK).setPersistenceEnabled(true);
+
+        res.setDataStorageConfiguration(new 
DataStorageConfiguration().setDataRegionConfigurations(diskRegion)
+            .setDefaultDataRegionConfiguration(memRegion));
+
+        return res;
+    }
+
+    /**
+     * Test simple join.
+     */
+    @Test
+    public void testSimpleJoin() {
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2");
+
+        createPartitionedTable("t2",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3");
+
+        executeSingle("INSERT INTO t1 VALUES ('1', '1')");
+        executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')");
+
+        executeSingle("INSERT INTO t1 VALUES ('2', '2')");
+        executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')");
+
+        executeSingle("INSERT INTO t1 VALUES ('3', '3')");
+        executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')");
+
+        executeSingle("INSERT INTO t1 VALUES ('4', '4')");
+        executeSingle("INSERT INTO t2 VALUES ('4', '4', '4')");
+
+        executeSingle("INSERT INTO t1 VALUES ('5', '5')");
+        executeSingle("INSERT INTO t2 VALUES ('5', '5', '5')");
+
+        // Key (not alias).
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1")
+                );
+                assertEquals(1, res.size());
+                assertEquals("1", res.get(0).get(0));
+            },
+            "1"
+        );
+
+        // Key (alias).
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE 
t1._KEY = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "2")
+                );
+                assertEquals(1, res.size());
+                assertEquals("2", res.get(0).get(0));
+            },
+            "2"
+        );
+
+        // Non-affinity key.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.k1 
= ?",
+            (res) -> {
+                assertNoPartitions();
+                assertEquals(1, res.size());
+                assertEquals("3", res.get(0).get(0));
+            },
+            "3"
+        );
+
+        // Affinity key.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 
= ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t2", "4")
+                );
+                assertEquals(1, res.size());
+                assertEquals("4", res.get(0).get(0));
+            },
+            "4"
+        );
+
+        // Complex key.
+        BinaryObject key = client().binary().builder("t2_key").setField("k1", 
"5").setField("ak2", "5").build();
+
+        List<List<?>> res = executeSingle("SELECT * FROM t1 INNER JOIN t2 ON 
t1.k1 = t2.ak2 WHERE t2._KEY = ?", key);
+        assertPartitions(
+            partition("t2", "5")
+        );
+        assertEquals(1, res.size());
+        assertEquals("5", res.get(0).get(0));
+    }
+
+    /**
+     * Test how partition ownership is transferred in various cases.
+     */
+    @Test
+    public void testPartitionTransfer() {
+        // First co-located table.
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2"
+        );
+
+        // Second co-located table.
+        createPartitionedTable("t2",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3"
+        );
+
+        // Third co-located table.
+        createPartitionedTable("t3",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3",
+            "v4"
+        );
+
+        // Transfer through "AND".
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? AND t2.ak2 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1", "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? AND t2.ak2 = ?",
+            (res) -> assertNoRequests(),
+            "1", "2"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? AND t2.ak2 IN (?, ?)",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1", "1", "2"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? AND t2.ak2 IN (?, ?)",
+            (res) -> assertNoRequests(),
+            "1", "2", "3"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
IN (?, ?) AND t2.ak2 IN (?, ?)",
+            (res) -> assertPartitions(
+                partition("t1", "2")
+            ),
+            "1", "2", "2", "3"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
IN (?, ?) AND t2.ak2 IN (?, ?)",
+            (res) -> assertNoRequests(),
+            "1", "2", "3", "4"
+        );
+
+        // Transfer through "OR".
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? OR t2.ak2 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1", "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? OR t2.ak2 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1"),
+                partition("t2", "2")
+            ),
+            "1", "2"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? OR t2.ak2 IN (?, ?)",
+            (res) -> assertPartitions(
+                partition("t1", "1"),
+                partition("t2", "2")
+            ),
+            "1", "1", "2"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ? OR t2.ak2 IN (?, ?)",
+            (res) -> assertPartitions(
+                partition("t1", "1"),
+                partition("t2", "2"),
+                partition("t2", "3")
+            ),
+            "1", "2", "3"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
IN (?, ?) OR t2.ak2 IN (?, ?)",
+            (res) -> assertPartitions(
+                partition("t1", "1"),
+                partition("t1", "2"),
+                partition("t2", "3")
+            ),
+            "1", "2", "2", "3"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
IN (?, ?) OR t2.ak2 IN (?, ?)",
+            (res) -> assertPartitions(
+                partition("t1", "1"),
+                partition("t1", "2"),
+                partition("t2", "3"),
+                partition("t2", "4")
+            ),
+            "1", "2", "3", "4"
+        );
+
+        // Multi-way co-located JOIN.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 INNER JOIN 
t3 ON t1.k1 = t3.ak2 " +
+                "WHERE t1.k1 = ? AND t2.ak2 = ? AND t3.ak2 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1", "1", "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 INNER JOIN 
t3 ON t1.k1 = t3.ak2 " +
+                "WHERE t1.k1 = ? AND t2.ak2 = ? AND t3.ak2 = ?",
+            (res) -> assertNoRequests(),
+            "1", "2", "3"
+        );
+
+        // No transfer through intermediate table.
+        execute("SELECT * FROM t1 INNER JOIN t3 ON t1.k1 = t3.v3 INNER JOIN t2 
ON t3.v4 = t2.ak2 " +
+                "WHERE t1.k1 = ? AND t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "1"
+        );
+
+        // No transfer through disjunction.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1 = ? OR 
t1.k1 = t2.ak2",
+            (res) -> assertNoPartitions(),
+            "1"
+        );
+    }
+
+    /**
+     * Test cross-joins. They cannot "transfer" partitions between joined 
tables.
+     */
+    @Test
+    public void testCrossJoin() {
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2");
+
+        createPartitionedTable("t2",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3");
+
+        executeSingle("INSERT INTO t1 VALUES ('1', '1')");
+        executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')");
+
+        executeSingle("INSERT INTO t1 VALUES ('2', '2')");
+        executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')");
+
+        executeSingle("INSERT INTO t1 VALUES ('3', '3')");
+        executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')");
+
+        // Left table, should work.
+        execute("SELECT * FROM t1, t2 WHERE t1.k1 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1")
+                );
+                assertEquals(1, res.size());
+                assertEquals("1", res.get(0).get(0));
+            },
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1")
+                );
+                assertEquals(1, res.size());
+                assertEquals("1", res.get(0).get(0));
+            },
+            "1"
+        );
+
+        // Right table, should work.
+        execute("SELECT * FROM t1, t2 WHERE t2.ak2 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t2", "2")
+                );
+                assertEquals(1, res.size());
+                assertEquals("2", res.get(0).get(0));
+            },
+            "2"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t2.ak2 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t2", "2")
+                );
+                assertEquals(1, res.size());
+                assertEquals("2", res.get(0).get(0));
+            },
+            "2"
+        );
+
+        execute("SELECT * FROM t1, t2 WHERE t1.k1=? AND t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "3", "3"
+        );
+
+        // Two tables, should not work.
+        execute("SELECT * FROM t1, t2 WHERE t1.k1=? AND t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "3", "3"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON 1=1 WHERE t1.k1=? AND 
t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "3", "3"
+        );
+    }
+
+    /**
+     * Test non-equijoins.
+     */
+    @Test
+    public void testThetaJoin() {
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2");
+
+        createPartitionedTable("t2",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3");
+
+        // Greater than.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 
= ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t2.ak2 
= ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 
= ? AND t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 > t2.ak2 WHERE t1.k1 
= ? OR t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "2"
+        );
+
+        // Less than.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 
= ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t2.ak2 
= ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 
= ? AND t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 < t2.ak2 WHERE t1.k1 
= ? OR t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "2"
+        );
+
+        // Non-equal.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 
= ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE 
t2.ak2 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 
= ? AND t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 <> t2.ak2 WHERE t1.k1 
= ? OR t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "2"
+        );
+    }
+
+    /**
+     * Test joins with REPLICATED cache.
+     */
+    @Test
+    public void testJoinWithReplicated() {
+        // First co-located table.
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2"
+        );
+
+        // Replicated table.
+        createReplicatedTable("t2",
+            pkColumn("k1"),
+            "v2",
+            "v3"
+        );
+
+        // Only partition from PARTITIONED cache should be used.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 = 
? AND t2.k1 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1", "2"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 
IN (?, ?) AND t2.k1 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1"),
+                partition("t1", "2")
+            ),
+            "1", "2", "3"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t1.k1 = 
? OR t2.k1 = ?",
+            (res) -> assertNoPartitions(),
+            "1", "2"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.k1 WHERE t2.k1 = 
?",
+            (res) -> assertNoPartitions(),
+            "1"
+        );
+    }
+
+    /**
+     * Test joins with different affinity functions.
+     */
+    @Test
+    public void testJoinWithDifferentAffinityFunctions() {
+        // Partition count.
+        checkAffinityFunctions(
+            cacheConfiguration(256, 1, false, false, false),
+            cacheConfiguration(256, 1, false, false, false),
+            true
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(1024, 1, false, false, false),
+            cacheConfiguration(256, 1, false, false, false),
+            false
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(256, 1, false, false, false),
+            cacheConfiguration(1024, 1, false, false, false),
+            false
+        );
+
+        // Backups.
+        checkAffinityFunctions(
+            cacheConfiguration(256, 1, false, false, false),
+            cacheConfiguration(256, 2, false, false, false),
+            true
+        );
+
+        // Different affinity functions.
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, true, false, false),
+            cacheConfiguration(256, 2, false, false, false),
+            false
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, false, false, false),
+            cacheConfiguration(256, 2, true, false, false),
+            false
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, true, false, false),
+            cacheConfiguration(256, 2, true, false, false),
+            false
+        );
+
+        // Node filters.
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, false, true, false),
+            cacheConfiguration(256, 2, false, false, false),
+            false
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, false, false, false),
+            cacheConfiguration(256, 2, false, true, false),
+            false
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, false, true, false),
+            cacheConfiguration(256, 2, false, true, false),
+            false
+        );
+
+        // With and without persistence.
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, false, false, true),
+            cacheConfiguration(256, 2, false, false, false),
+            false
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, false, false, false),
+            cacheConfiguration(256, 2, false, false, true),
+            false
+        );
+
+        checkAffinityFunctions(
+            cacheConfiguration(256, 2, false, false, true),
+            cacheConfiguration(256, 2, false, false, true),
+            true
+        );
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkAffinityFunctions(CacheConfiguration ccfg1, 
CacheConfiguration ccfg2, boolean compatible) {
+        // Destroy old caches.
+        Ignite cli = client();
+
+        cli.destroyCaches(cli.cacheNames());
+
+        // Start new caches.
+        ccfg1.setName("t1");
+        ccfg2.setName("t2");
+
+        QueryEntity entity1 = new QueryEntity(KeyClass1.class, 
ValueClass.class).setTableName("t1");
+        QueryEntity entity2 = new QueryEntity(KeyClass2.class, 
ValueClass.class).setTableName("t2");
+
+        ccfg1.setQueryEntities(Collections.singletonList(entity1));
+        ccfg2.setQueryEntities(Collections.singletonList(entity2));
+
+        ccfg1.setKeyConfiguration(new 
CacheKeyConfiguration(entity1.getKeyType(), "k1"));
+        ccfg2.setKeyConfiguration(new 
CacheKeyConfiguration(entity2.getKeyType(), "ak2"));
+
+        ccfg1.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+        ccfg2.setSqlSchema(QueryUtils.DFLT_SCHEMA);
+
+        client().createCache(ccfg1);
+        client().createCache(ccfg2);
+
+        // Conduct tests.
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 
= ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t2.ak2 
= ?",
+            (res) -> assertPartitions(
+                partition("t2", "2")
+            ),
+            "2"
+        );
+
+        if (compatible) {
+            execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE 
t1.k1 = ? OR t2.ak2 = ?",
+                (res) -> assertPartitions(
+                    partition("t1", "1"),
+                    partition("t2", "2")
+                ),
+                "1", "2"
+            );
+        }
+        else {
+            execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE 
t1.k1 = ? OR t2.ak2 = ?",
+                (res) -> assertNoPartitions(),
+                "1", "2"
+            );
+        }
+    }
+
+    /**
+     * Create custom cache configuration.
+     *
+     * @param parts Partitions.
+     * @param backups Backups.
+     * @param customAffinity Custom affinity function flag.
+     * @param nodeFilter Whether to set node filter.
+     * @param persistent Whether to enable persistence.
+     * @return Cache configuration.
+     */
+    private static CacheConfiguration cacheConfiguration(
+        int parts,
+        int backups,
+        boolean customAffinity,
+        boolean nodeFilter,
+        boolean persistent
+    ) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setBackups(backups);
+
+        RendezvousAffinityFunction affFunc;
+
+        if (customAffinity)
+            affFunc = new CustomRendezvousAffinityFunction();
+        else
+            affFunc = new RendezvousAffinityFunction();
+
+        affFunc.setPartitions(parts);
+
+        ccfg.setAffinity(affFunc);
+
+        if (nodeFilter)
+            ccfg.setNodeFilter(new CustomNodeFilter());
+
+        if (persistent)
+            ccfg.setDataRegionName(REGION_DISK);
+
+        return ccfg;
+    }
+
+    /**
+     * Test joins with subqueries.
+     */
+    @Test
+    public void testJoinWithSubquery() {
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2");
+
+        createPartitionedTable("t2",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3");
+
+        execute("SELECT * FROM t1 INNER JOIN (SELECT * FROM t2) T2_SUB ON 
t1.k1 = T2_SUB.ak2 WHERE t1.k1 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 INNER JOIN (SELECT * FROM t2) T2_SUB ON 
t1.k1 = T2_SUB.ak2 WHERE T2_SUB.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1"
+        );
+    }
+
+    /**
+     * Test joins when explicit partitions are set.
+     */
+    @Test
+    public void testExplicitPartitions() {
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2");
+
+        createPartitionedTable("t2",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3");
+
+        executeSqlFieldsQuery(new SqlFieldsQuery("SELECT * FROM t1 INNER JOIN 
t2 ON t1.k1 = t2.ak2 " +
+            "WHERE t1.k1=? OR t2.ak2=?").setArgs("1", "2").setPartitions(1));
+
+        assertPartitions(1);
+    }
+
+    /**
+     * Test outer joins.
+     */
+    @Test
+    public void testOuterJoin() {
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2");
+
+        createPartitionedTable("t2",
+            pkColumn("k1"),
+            affinityColumn("ak2"),
+            "v3");
+
+        execute("SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.k1 = t2.ak2 WHERE 
t1.k1 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.k1 = t2.ak2 WHERE 
t2.ak2 = ?",
+            (res) -> assertNoPartitions(),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 LEFT OUTER JOIN t2 T2_1 ON t1.k1 = T2_1.ak2 
INNER JOIN t2 T2_2 ON T2_1.k1 = T2_2.k1 " +
+                "WHERE T2_2.ak2 = ?",
+            (res) -> assertPartitions(
+                partition("t2", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 LEFT OUTER JOIN t2 T2_1 ON t1.k1 = T2_1.ak2 
INNER JOIN t2 T2_2 ON t1.k1 = T2_2.ak2 " +
+                "WHERE T2_1.ak2 = ? AND T2_2.ak2=?",
+            (res) -> assertPartitions(
+                partition("t2", "2")
+            ),
+            "1", "2"
+        );
+    }
+
+    /**
+     * Test JOINs on a single table.
+     */
+    @Test
+    public void testSelfJoin() {
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v2");
+
+        execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 
= ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1"
+        );
+
+        execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 
= ? AND B.k1 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1")
+            ),
+            "1", "1"
+        );
+
+        execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 
= ? AND B.k1 = ?",
+            (res) -> assertNoRequests(),
+            "1", "2"
+        );
+
+        execute("SELECT * FROM t1 A INNER JOIN t1 B ON A.k1 = B.k1 WHERE A.k1 
= ? OR B.k1 = ?",
+            (res) -> assertPartitions(
+                partition("t1", "1"),
+                partition("t1", "2")
+            ),
+            "1", "2"
+        );
+    }
+
+    /**
+     * Create PARTITIONED table.
+     *
+     * @param name Name.
+     * @param cols Columns.
+     */
+    private void createPartitionedTable(String name, Object... cols) {
+        createTable0(name, false, cols);
+    }
+
+    /**
+     * Create REPLICATED table.
+     *
+     * @param name Name.
+     * @param cols Columns.
+     */
+    @SuppressWarnings("SameParameterValue")
+    private void createReplicatedTable(String name, Object... cols) {
+        createTable0(name, true, cols);
+    }
+
+    /**
+     * Internal CREATE TABLE routine.
+     *
+     * @param name Name.
+     * @param replicated Replicated table flag.
+     * @param cols Columns.
+     */
+    @SuppressWarnings("StringConcatenationInsideStringBufferAppend")
+    private void createTable0(String name, boolean replicated, Object... cols) 
{
+        List<String> pkCols = new ArrayList<>();
+
+        String affCol = null;
+
+        StringBuilder sql = new StringBuilder("CREATE TABLE 
").append(name).append("(");
+        for (Object col : cols) {
+            Column col0 = col instanceof Column ? (Column)col : new 
Column((String)col, false, false);
+
+            sql.append(col0.name()).append(" VARCHAR, ");
+
+            if (col0.pk())
+                pkCols.add(col0.name());
+
+            if (col0.affinity()) {
+                if (affCol != null)
+                    throw new IllegalStateException("Only one affinity column 
is allowed: " + col0.name());
+
+                affCol = col0.name();
+            }
+        }
+
+        if (pkCols.isEmpty())
+            throw new IllegalStateException("No PKs!");
+
+        sql.append("PRIMARY KEY (");
+
+        boolean firstPkCol = true;
+
+        for (String pkCol : pkCols) {
+            if (firstPkCol)
+                firstPkCol = false;
+            else
+                sql.append(", ");
+
+            sql.append(pkCol);
+        }
+
+        sql.append(")");
+
+        sql.append(") WITH \"template=" + (replicated ? "replicated" : 
"partitioned"));
+        sql.append(", CACHE_NAME=" + name);
+
+        if (affCol != null) {
+            sql.append(", AFFINITY_KEY=" + affCol);
+            sql.append(", KEY_TYPE=" + name + "_key");
+        }
+
+        sql.append("\"");
+
+        executeSingle(sql.toString());
+    }
+
+    /**
+     * Execute query with all possible combinations of argument placeholders.
+     *
+     * @param sql SQL.
+     * @param resConsumer Result consumer.
+     * @param args Arguments.
+     */
+    public void execute(String sql, Consumer<List<List<?>>> resConsumer, 
Object... args) {
+        System.out.println(">>> TEST COMBINATION: " + sql);
+
+        // Execute query as is.
+        List<List<?>> res = executeSingle(sql, args);
+
+        resConsumer.accept(res);
+
+        // Start filling arguments recursively.
+        if (args != null && args.length > 0)
+            executeCombinations0(sql, resConsumer, new HashSet<>(), args);
+
+        System.out.println();
+    }
+
+    /**
+     * Execute query with all possible combinations of argument placeholders.
+     *
+     * @param sql SQL.
+     * @param resConsumer Result consumer.
+     * @param executedSqls Already executed SQLs.
+     * @param args Arguments.
+     */
+    public void executeCombinations0(
+        String sql,
+        Consumer<List<List<?>>> resConsumer,
+        Set<String> executedSqls,
+        Object... args
+    ) {
+        assert args != null && args.length > 0;
+
+        // Get argument positions.
+        List<Integer> paramPoss = new ArrayList<>();
+
+        int pos = 0;
+
+        while (true) {
+            int paramPos = sql.indexOf('?', pos);
+
+            if (paramPos == -1)
+                break;
+
+            paramPoss.add(paramPos);
+
+            pos = paramPos + 1;
+        }
+
+        for (int i = 0; i < args.length; i++) {
+            // Prepare new SQL and arguments.
+            int paramPos = paramPoss.get(i);
+
+            String newSql = sql.substring(0, paramPos) + args[i] + 
sql.substring(paramPos + 1);
+
+            Object[] newArgs = new Object[args.length - 1];
+
+            int newArgsPos = 0;
+
+            for (int j = 0; j < args.length; j++) {
+                if (j != i)
+                    newArgs[newArgsPos++] = args[j];
+            }
+
+            // Execute if this combination was never executed before.
+            if (executedSqls.add(newSql)) {
+                List<List<?>> res = executeSingle(newSql, newArgs);
+
+                resConsumer.accept(res);
+            }
+
+            // Continue recursively.
+            if (newArgs.length > 0)
+                executeCombinations0(newSql, resConsumer, executedSqls, 
newArgs);
+        }
+    }
+
+    /**
+     * Execute SQL query.
+     *
+     * @param sql SQL.
+     */
+    private List<List<?>> executeSingle(String sql, Object... args) {
+        clearIoState();
+
+        if (args == null || args.length == 0)
+            System.out.println(">>> " + sql);
+        else
+            System.out.println(">>> " + sql + " " + Arrays.toString(args));
+
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        if (args != null && args.length > 0)
+            qry.setArgs(args);
+
+        return executeSqlFieldsQuery(qry);
+    }
+
+    /**
+     * Execute prepared SQL fields query.
+     *
+     * @param qry Query.
+     * @return Result.
+     */
+    private List<List<?>> executeSqlFieldsQuery(SqlFieldsQuery qry) {
+        return client().context().query().querySqlFields(qry, false).getAll();
+    }
+
+    /**
+     * @return Client node.
+     */
+    private IgniteEx client() {
+        return grid(CLI_NAME);
+    }
+
+    /**
+     * Clear partitions.
+     */
+    private static void clearIoState() {
+        INTERCEPTED_REQS.set(0);
+        INTERCEPTED_PARTS.clear();
+    }
+
+    /**
+     * Make sure that expected partitions are logged.
+     *
+     * @param expParts Expected partitions.
+     */
+    private static void assertPartitions(int... expParts) {
+        Collection<Integer> expParts0 = new TreeSet<>();
+
+        for (int expPart : expParts)
+            expParts0.add(expPart);
+
+        assertPartitions(expParts0);
+    }
+
+    /**
+     * Make sure that expected partitions are logged.
+     *
+     * @param expParts Expected partitions.
+     */
+    private static void assertPartitions(Collection<Integer> expParts) {
+        TreeSet<Integer> expParts0 = new TreeSet<>(expParts);
+        TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS);
+
+        assertEquals("Unexpected partitions [exp=" + expParts + ", actual=" + 
actualParts + ']',
+            expParts0, actualParts);
+    }
+
+    /**
+     * Make sure that no partitions were extracted.
+     */
+    private static void assertNoPartitions() {
+        assertTrue("No requests were sent.", INTERCEPTED_REQS.get() > 0);
+        assertTrue("Partitions are not empty: " + INTERCEPTED_PARTS, 
INTERCEPTED_PARTS.isEmpty());
+    }
+
+    /**
+     * Make sure there were no requests sent because we determined empty 
partition set.
+     */
+    private static void assertNoRequests() {
+        assertEquals("Requests were sent: " + INTERCEPTED_REQS.get(), 0, 
INTERCEPTED_REQS.get());
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @return Partition.
+     */
+    private int partition(String cacheName, Object key) {
+        return client().affinity(cacheName).partition(key);
+    }
+
+    /**
+     * TCP communication SPI which will track outgoing query requests.
+     */
+    private static class TrackingTcpCommunicationSpi extends 
TcpCommunicationSpi {
+        @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC) {
+            if (msg instanceof GridIoMessage) {
+                GridIoMessage msg0 = (GridIoMessage)msg;
+
+                if (msg0.message() instanceof GridH2QueryRequest) {
+                    INTERCEPTED_REQS.incrementAndGet();
+
+                    GridH2QueryRequest req = 
(GridH2QueryRequest)msg0.message();
+
+                    int[] parts = req.queryPartitions();
+
+                    if (!F.isEmpty(parts)) {
+                        for (int part : parts)
+                            INTERCEPTED_PARTS.add(part);
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+    /**
+     * @param name Name.
+     * @return PK column.
+     */
+    public Column pkColumn(String name) {
+        return new Column(name, true, false);
+    }
+
+    /**
+     * @param name Name.
+     * @return Affintiy column.
+     */
+    public Column affinityColumn(String name) {
+        return new Column(name, true, true);
+    }
+
+    /**
+     * Column.
+     */
+    private static class Column {
+        /** Name. */
+        private final String name;
+
+        /** PK. */
+        private final boolean pk;
+
+        /** Affinity key. */
+        private final boolean aff;
+
+        /**
+         * Constructor.
+         *
+         * @param name Name.
+         * @param pk PK flag.
+         * @param aff Affinity flag.
+         */
+        public Column(String name, boolean pk, boolean aff) {
+            this.name = name;
+            this.pk = pk;
+            this.aff = aff;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String name() {
+            return name;
+        }
+
+        /**
+         * @return PK flag.
+         */
+        public boolean pk() {
+            return pk;
+        }
+
+        /**
+         * @return Affintiy flag.
+         */
+        public boolean affinity() {
+            return aff;
+        }
+    }
+
+    /**
+     * Custom affinity function.
+     */
+    private static class CustomRendezvousAffinityFunction extends 
RendezvousAffinityFunction {
+        // No-op.
+    }
+
+    /**
+     * Custom node filter.
+     */
+    private static class CustomNodeFilter implements 
IgnitePredicate<ClusterNode> {
+        @Override public boolean apply(ClusterNode clusterNode) {
+            return true;
+        }
+    }
+
+    /**
+     * Key class 1.
+     */
+    @SuppressWarnings("unused")
+    private static class KeyClass1 {
+        /** Key. */
+        @QuerySqlField
+        private String k1;
+    }
+
+    /**
+     * Key class 2.
+     */
+    @SuppressWarnings("unused")
+    private static class KeyClass2 {
+        /** Key. */
+        @QuerySqlField
+        private String k1;
+
+        /** Affinity key. */
+        @QuerySqlField
+        private String ak2;
+    }
+
+    /**
+     * Value class.
+     */
+    @SuppressWarnings("unused")
+    private static class ValueClass {
+        /** Value. */
+        @QuerySqlField
+        private String v;
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index ce8f6cc..5ffd7fa 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -215,6 +215,7 @@ import 
org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.AndOperationExtractPartitionSelfTest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.BetweenOperationExtractPartitionSelfTest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.InOperationExtractPartitionSelfTest;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.JoinPartitionPruningSelfTest;
 import 
org.apache.ignite.internal.processors.sql.IgniteCachePartitionedAtomicColumnConstraintsTest;
 import 
org.apache.ignite.internal.processors.sql.IgniteCachePartitionedTransactionalColumnConstraintsTest;
 import 
org.apache.ignite.internal.processors.sql.IgniteCachePartitionedTransactionalSnapshotColumnConstraintTest;
@@ -527,6 +528,7 @@ import org.junit.runners.Suite;
     InOperationExtractPartitionSelfTest.class,
     AndOperationExtractPartitionSelfTest.class,
     BetweenOperationExtractPartitionSelfTest.class,
+    JoinPartitionPruningSelfTest.class,
 
     GridCacheDynamicLoadOnClientTest.class,
     GridCacheDynamicLoadOnClientPersistentTest.class,

Reply via email to