Repository: ignite
Updated Branches:
  refs/heads/master bf75ef22d -> 6613376d2


IGNITE-10535: SQL: moved reducer partition resolution logic to separate 
classes. This closes #5575.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6613376d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6613376d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6613376d

Branch: refs/heads/master
Commit: 6613376d2cdd9bbe68dd7b3167cfd5c16a39f67e
Parents: bf75ef2
Author: devozerov <voze...@gridgain.com>
Authored: Thu Dec 6 17:47:50 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Thu Dec 6 17:47:50 2018 +0300

----------------------------------------------------------------------
 .../h2/twostep/GridReduceQueryExecutor.java     | 648 +------------------
 .../h2/twostep/ReducePartitionMapResult.java    |  73 +++
 .../query/h2/twostep/ReducePartitionMapper.java | 638 ++++++++++++++++++
 .../h2/twostep/ReducePartitionsSpecializer.java |  61 ++
 4 files changed, 787 insertions(+), 633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 36287b3..905a4a3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -22,16 +22,12 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -47,7 +43,6 @@ import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -61,7 +56,6 @@ import 
org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVersionFuture;
@@ -89,8 +83,6 @@ import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlReque
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2SelectForUpdateTxDetails;
-import org.apache.ignite.internal.util.GridIntIterator;
-import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CIX2;
@@ -113,9 +105,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
-import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
@@ -138,9 +127,6 @@ public class GridReduceQueryExecutor {
     private static final String MERGE_INDEX_SORTED = "merge_sorted";
 
     /** */
-    private static final Set<ClusterNode> UNMAPPED_PARTS = 
Collections.emptySet();
-
-    /** */
     private GridKernalContext ctx;
 
     /** */
@@ -174,7 +160,12 @@ public class GridReduceQueryExecutor {
         }
     };
 
+    /** Partition mapper. */
+    private ReducePartitionMapper mapper;
+
     /**
+     * Constructor.
+     *
      * @param qryIdGen Query ID generator.
      * @param busyLock Busy lock.
      */
@@ -194,6 +185,8 @@ public class GridReduceQueryExecutor {
 
         log = ctx.log(GridReduceQueryExecutor.class);
 
+        mapper = new ReducePartitionMapper(ctx, log);
+
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new 
GridMessageListener() {
             @SuppressWarnings("deprecation")
             @Override public void onMessage(UUID nodeId, Object msg, byte plc) 
{
@@ -370,32 +363,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param cacheIds Cache IDs.
-     * @return {@code true} If preloading is active.
-     */
-    private boolean isPreloadingActive(List<Integer> cacheIds) {
-        for (Integer cacheId : cacheIds) {
-            if (null == cacheContext(cacheId))
-                throw new CacheException(String.format("Cache not found on 
local node [cacheId=%d]", cacheId));
-
-            if (hasMovingPartitions(cacheContext(cacheId)))
-                return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @return {@code True} If cache has partitions in {@link 
GridDhtPartitionState#MOVING} state.
-     */
-    private boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
-
-        return !cctx.isLocal() && cctx.topology().hasMovingPartitions();
-    }
-
-    /**
      * @param cacheId Cache ID.
      * @return Cache context.
      */
@@ -404,154 +371,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param topVer Topology version.
-     * @param cctx Cache context.
-     * @param parts Partitions.
-     */
-    private Map<ClusterNode, IntArray> 
stableDataNodesMap(AffinityTopologyVersion topVer,
-        final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) {
-
-        Map<ClusterNode, IntArray> mapping = new HashMap<>();
-
-        // Explicit partitions mapping is not applicable to replicated cache.
-        if (cctx.isReplicated()) {
-            for (ClusterNode clusterNode : 
cctx.affinity().assignment(topVer).nodes())
-                mapping.put(clusterNode, null);
-
-            return mapping;
-        }
-
-        List<List<ClusterNode>> assignment = 
cctx.affinity().assignment(topVer).assignment();
-
-        boolean needPartsFilter = parts != null;
-
-        GridIntIterator iter = needPartsFilter ? new 
GridIntList(parts).iterator() :
-            U.forRange(0, cctx.affinity().partitions());
-
-        while(iter.hasNext()) {
-            int partId = iter.next();
-
-            List<ClusterNode> partNodes = assignment.get(partId);
-
-            if (!partNodes.isEmpty()) {
-                ClusterNode prim = partNodes.get(0);
-
-                if (!needPartsFilter) {
-                    mapping.put(prim, null);
-
-                    continue;
-                }
-
-                IntArray partIds = mapping.get(prim);
-
-                if (partIds == null) {
-                    partIds = new IntArray();
-
-                    mapping.put(prim, partIds);
-                }
-
-                partIds.add(partId);
-            }
-        }
-
-        return mapping;
-    }
-
-    /**
-     * Load failed partition reservation.
-     *
-     * @param msg Message.
-     */
-    private void logRetry(String msg) {
-        log.info(msg);
-    }
-
-    /**
-     * @param isReplicatedOnly If we must only have replicated caches.
-     * @param topVer Topology version.
-     * @param cacheIds Participating cache IDs.
-     * @param parts Partitions.
-     * @param qryId Query ID.
-     * @return Data nodes or {@code null} if repartitioning started and we 
need to retry.
-     */
-    private Map<ClusterNode, IntArray> stableDataNodes(boolean 
isReplicatedOnly, AffinityTopologyVersion topVer,
-        List<Integer> cacheIds, int[] parts, long qryId) {
-        GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0));
-
-        // If the first cache is not partitioned, find it (if it's present) 
and move it to index 0.
-        if (!cctx.isPartitioned()) {
-            for (int cacheId = 1; cacheId < cacheIds.size(); cacheId++) {
-                GridCacheContext<?, ?> currCctx = 
cacheContext(cacheIds.get(cacheId));
-
-                if (currCctx.isPartitioned()) {
-                    Collections.swap(cacheIds, 0, cacheId);
-
-                    cctx = currCctx;
-
-                    break;
-                }
-            }
-        }
-
-        Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, 
parts);
-
-        Set<ClusterNode> nodes = map.keySet();
-
-        if (F.isEmpty(map))
-            throw new CacheException("Failed to find data nodes for cache: " + 
cctx.name());
-
-        for (int i = 1; i < cacheIds.size(); i++) {
-            GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i));
-
-            String extraCacheName = extraCctx.name();
-
-            if (extraCctx.isLocal())
-                continue; // No consistency guaranties for local caches.
-
-            if (isReplicatedOnly && !extraCctx.isReplicated())
-                throw new CacheException("Queries running on replicated cache 
should not contain JOINs " +
-                    "with partitioned tables [replicatedCache=" + cctx.name() +
-                    ", partitionedCache=" + extraCacheName + "]");
-
-            Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, 
extraCctx, parts).keySet();
-
-            if (F.isEmpty(extraNodes))
-                throw new CacheException("Failed to find data nodes for cache: 
" + extraCacheName);
-
-            boolean disjoint;
-
-            if (extraCctx.isReplicated()) {
-                if (isReplicatedOnly) {
-                    nodes.retainAll(extraNodes);
-
-                    disjoint = map.isEmpty();
-                }
-                else
-                    disjoint = !extraNodes.containsAll(nodes);
-            }
-            else
-                disjoint = !extraNodes.equals(nodes);
-
-            if (disjoint) {
-                if (isPreloadingActive(cacheIds)) {
-                    logRetry("Failed to calculate nodes for SQL query (got 
disjoint node map during rebalance) " +
-                        "[qryId=" + qryId + ", affTopVer=" + topVer + ", 
cacheIds=" + cacheIds +
-                        ", parts=" + (parts == null ? "[]" : 
Arrays.toString(parts)) +
-                        ", replicatedOnly=" + isReplicatedOnly + ", 
lastCache=" + extraCctx.name() +
-                        ", lastCacheId=" + extraCctx.cacheId() + ']');
-
-                    return null; // Retry.
-                }
-                else
-                    throw new CacheException("Caches have distinct sets of 
data nodes [cache1=" + cctx.name() +
-                        ", cache2=" + extraCacheName + "]");
-            }
-        }
-
-        return map;
-    }
-
-    /**
      * @param schemaName Schema name.
      * @param qry Query.
      * @param keepBinary Keep binary.
@@ -686,8 +505,8 @@ public class GridReduceQueryExecutor {
             if (qry.isLocal())
                 nodes = singletonList(ctx.discovery().localNode());
             else {
-                NodesForPartitionsResult nodesParts =
-                    nodesForPartitions(cacheIds, topVer, parts, 
isReplicatedOnly, qryReqId);
+                ReducePartitionMapResult nodesParts =
+                    mapper.nodesForPartitions(cacheIds, topVer, parts, 
isReplicatedOnly, qryReqId);
 
                 nodes = nodesParts.nodes();
                 partsMap = nodesParts.partitionsMap();
@@ -843,7 +662,7 @@ public class GridReduceQueryExecutor {
                     req.mvccSnapshot(mvccTracker.snapshot());
 
                 final C2<ClusterNode, Message, Message> pspec =
-                    (parts == null ? null : new 
ExplicitPartitionsSpecializer(qryMap));
+                    (parts == null ? null : new 
ReducePartitionsSpecializer(qryMap));
 
                 final C2<ClusterNode, Message, Message> spec;
 
@@ -905,7 +724,7 @@ public class GridReduceQueryExecutor {
                 else // Send failed.
                     retry = true;
 
-                Iterator<List<?>> resIter = null;
+                Iterator<List<?>> resIter;
 
                 if (!retry) {
                     if (skipMergeTbl) {
@@ -1050,7 +869,8 @@ public class GridReduceQueryExecutor {
 
         final long reqId = qryIdGen.incrementAndGet();
 
-        NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, 
topVer, parts, isReplicatedOnly, reqId);
+        ReducePartitionMapResult nodesParts =
+            mapper.nodesForPartitions(cacheIds, topVer, parts, 
isReplicatedOnly, reqId);
 
         final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, 
selectQry, GridCacheQueryType.SQL_FIELDS,
             schemaName, U.currentTimeMillis(), cancel, false);
@@ -1105,8 +925,8 @@ public class GridReduceQueryExecutor {
             Map<ClusterNode, IntArray> partsMap = 
(nodesParts.queryPartitionsMap() != null) ?
                 nodesParts.queryPartitionsMap() : nodesParts.partitionsMap();
 
-            ExplicitPartitionsSpecializer partsSpec = (parts == null) ? null :
-                new ExplicitPartitionsSpecializer(partsMap);
+            ReducePartitionsSpecializer partsSpec = (parts == null) ? null :
+                new ReducePartitionsSpecializer(partsMap);
 
             final Collection<ClusterNode> finalNodes = nodes;
 
@@ -1290,286 +1110,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * Calculates data nodes for replicated caches on unstable topology.
-     *
-     * @param cacheIds Cache IDs.
-     * @param qryId Query ID.
-     * @return Collection of all data nodes owning all the caches or {@code 
null} for retry.
-     */
-    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> 
cacheIds, long qryId) {
-        int i = 0;
-
-        GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++));
-
-        // The main cache is allowed to be partitioned.
-        if (!cctx.isReplicated()) {
-            assert cacheIds.size() > 1: "no extra replicated caches with 
partitioned main cache";
-
-            // Just replace the main cache with the first one extra.
-            cctx = cacheContext(cacheIds.get(i++));
-
-            assert cctx.isReplicated(): "all the extra caches must be 
replicated here";
-        }
-
-        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx, qryId);
-
-        if (F.isEmpty(nodes))
-            return null; // Retry.
-
-        for (;i < cacheIds.size(); i++) {
-            GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i));
-
-            if (extraCctx.isLocal())
-                continue;
-
-            if (!extraCctx.isReplicated())
-                throw new CacheException("Queries running on replicated cache 
should not contain JOINs " +
-                    "with tables in partitioned caches [replicatedCache=" + 
cctx.name() + ", " +
-                    "partitionedCache=" + extraCctx.name() + "]");
-
-            Set<ClusterNode> extraOwners = 
replicatedUnstableDataNodes(extraCctx, qryId);
-
-            if (F.isEmpty(extraOwners))
-                return null; // Retry.
-
-            nodes.retainAll(extraOwners);
-
-            if (nodes.isEmpty()) {
-                logRetry("Failed to calculate nodes for SQL query (got 
disjoint node map for REPLICATED caches " +
-                    "during rebalance) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
-                    ", lastCache=" + extraCctx.name() + ", lastCacheId=" + 
extraCctx.cacheId() + ']');
-
-                return null; // Retry.
-            }
-        }
-
-        return nodes;
-    }
-
-    /**
-     * @param grpId Cache group ID.
-     * @param topVer Topology version.
-     * @return Collection of data nodes.
-     */
-    private Collection<ClusterNode> dataNodes(int grpId, 
AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> res = 
ctx.discovery().cacheGroupAffinityNodes(grpId, topVer);
-
-        return res != null ? res : Collections.<ClusterNode>emptySet();
-    }
-
-    /**
-     * Collects all the nodes owning all the partitions for the given 
replicated cache.
-     *
-     * @param cctx Cache context.
-     * @param qryId Query ID.
-     * @return Owning nodes or {@code null} if we can't find owners for some 
partitions.
-     */
-    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> 
cctx, long qryId) {
-        assert cctx.isReplicated() : cctx.name() + " must be replicated";
-
-        String cacheName = cctx.name();
-
-        Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(cctx.groupId(), 
NONE));
-
-        if (dataNodes.isEmpty())
-            throw new CacheException("Failed to find data nodes for cache: " + 
cacheName);
-
-        // Find all the nodes owning all the partitions for replicated cache.
-        for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
-            List<ClusterNode> owners = cctx.topology().owners(p);
-
-            if (F.isEmpty(owners)) {
-                logRetry("Failed to calculate nodes for SQL query (partition 
of a REPLICATED cache has no owners) [" +
-                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", 
cacheId=" + cctx.cacheId() +
-                    ", part=" + p + ']');
-
-                return null; // Retry.
-            }
-
-            dataNodes.retainAll(owners);
-
-            if (dataNodes.isEmpty()) {
-                logRetry("Failed to calculate nodes for SQL query (partitions 
of a REPLICATED has no common owners) [" +
-                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", 
cacheId=" + cctx.cacheId() +
-                    ", lastPart=" + p + ']');
-
-                return null; // Retry.
-            }
-        }
-
-        return dataNodes;
-    }
-
-    /**
-     * Calculates partition mapping for partitioned cache on unstable topology.
-     *
-     * @param cacheIds Cache IDs.
-     * @param qryId Query ID.
-     * @return Partition mapping or {@code null} if we can't calculate it due 
to repartitioning and we need to retry.
-     */
-    @SuppressWarnings("unchecked")
-    private Map<ClusterNode, IntArray> 
partitionedUnstableDataNodes(List<Integer> cacheIds, long qryId) {
-        // If the main cache is replicated, just replace it with the first 
partitioned.
-        GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds);
-
-        final int partsCnt = cctx.affinity().partitions();
-
-        if (cacheIds.size() > 1) { // Check correct number of partitions for 
partitioned caches.
-            for (Integer cacheId : cacheIds) {
-                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
-
-                if (extraCctx.isReplicated() || extraCctx.isLocal())
-                    continue;
-
-                int parts = extraCctx.affinity().partitions();
-
-                if (parts != partsCnt)
-                    throw new CacheException("Number of partitions must be the 
same for correct collocation [cache1=" +
-                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + 
extraCctx.name() +
-                        ", parts2=" + parts + "]");
-            }
-        }
-
-        Set<ClusterNode>[] partLocs = new Set[partsCnt];
-
-        // Fill partition locations for main cache.
-        for (int p = 0; p < partsCnt; p++) {
-            List<ClusterNode> owners = cctx.topology().owners(p);
-
-            if (F.isEmpty(owners)) {
-                // Handle special case: no mapping is configured for a 
partition.
-                if (F.isEmpty(cctx.affinity().assignment(NONE).get(p))) {
-                    partLocs[p] = UNMAPPED_PARTS; // Mark unmapped partition.
-
-                    continue;
-                }
-                else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) {
-                    logRetry("Failed to calculate nodes for SQL query 
(partition has no owners, but corresponding " +
-                        "cache group has data nodes) [qryId=" + qryId + ", 
cacheIds=" + cacheIds +
-                        ", cacheName=" + cctx.name() + ", cacheId=" + 
cctx.cacheId() + ", part=" + p +
-                        ", cacheGroupId=" + cctx.groupId() + ']');
-
-                    return null; // Retry.
-                }
-
-                throw new CacheException("Failed to find data nodes [cache=" + 
cctx.name() + ", part=" + p + "]");
-            }
-
-            partLocs[p] = new HashSet<>(owners);
-        }
-
-        if (cacheIds.size() > 1) {
-            // Find owner intersections for each participating partitioned 
cache partition.
-            // We need this for logical collocation between different 
partitioned caches with the same affinity.
-            for (Integer cacheId : cacheIds) {
-                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
-
-                // This is possible if we have replaced a replicated cache 
with a partitioned one earlier.
-                if (cctx == extraCctx)
-                    continue;
-
-                if (extraCctx.isReplicated() || extraCctx.isLocal())
-                    continue;
-
-                for (int p = 0, parts = extraCctx.affinity().partitions(); p < 
parts; p++) {
-                    List<ClusterNode> owners = extraCctx.topology().owners(p);
-
-                    if (partLocs[p] == UNMAPPED_PARTS)
-                        continue; // Skip unmapped partitions.
-
-                    if (F.isEmpty(owners)) {
-                        if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) {
-                            logRetry("Failed to calculate nodes for SQL query 
(partition has no owners, but " +
-                                "corresponding cache group has data nodes) 
[qryId=" + qryId +
-                                ", cacheIds=" + cacheIds + ", cacheName=" + 
extraCctx.name() +
-                                ", cacheId=" + extraCctx.cacheId() + ", part=" 
+ p +
-                                ", cacheGroupId=" + extraCctx.groupId() + ']');
-
-                            return null; // Retry.
-                        }
-
-                        throw new CacheException("Failed to find data nodes 
[cache=" + extraCctx.name() +
-                            ", part=" + p + "]");
-                    }
-
-                    if (partLocs[p] == null)
-                        partLocs[p] = new HashSet<>(owners);
-                    else {
-                        partLocs[p].retainAll(owners); // Intersection of 
owners.
-
-                        if (partLocs[p].isEmpty()) {
-                            logRetry("Failed to calculate nodes for SQL query 
(caches have no common data nodes for " +
-                                "partition) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
-                                ", lastCacheName=" + extraCctx.name() + ", 
lastCacheId=" + extraCctx.cacheId() +
-                                ", part=" + p + ']');
-
-                            return null; // Intersection is empty -> retry.
-                        }
-                    }
-                }
-            }
-
-            // Filter nodes where not all the replicated caches loaded.
-            for (Integer cacheId : cacheIds) {
-                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
-
-                if (!extraCctx.isReplicated())
-                    continue;
-
-                Set<ClusterNode> dataNodes = 
replicatedUnstableDataNodes(extraCctx, qryId);
-
-                if (F.isEmpty(dataNodes))
-                    return null; // Retry.
-
-                int part = 0;
-
-                for (Set<ClusterNode> partLoc : partLocs) {
-                    if (partLoc == UNMAPPED_PARTS)
-                        continue; // Skip unmapped partition.
-
-                    partLoc.retainAll(dataNodes);
-
-                    if (partLoc.isEmpty()) {
-                        logRetry("Failed to calculate nodes for SQL query 
(caches have no common data nodes for " +
-                            "partition) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
-                            ", lastReplicatedCacheName=" + extraCctx.name() +
-                            ", lastReplicatedCacheId=" + extraCctx.cacheId() + 
", part=" + part + ']');
-
-                        return null; // Retry.
-                    }
-
-                    part++;
-                }
-            }
-        }
-
-        // Collect the final partitions mapping.
-        Map<ClusterNode, IntArray> res = new HashMap<>();
-
-        // Here partitions in all IntArray's will be sorted in ascending 
order, this is important.
-        for (int p = 0; p < partLocs.length; p++) {
-            Set<ClusterNode> pl = partLocs[p];
-
-            // Skip unmapped partitions.
-            if (pl == UNMAPPED_PARTS)
-                continue;
-
-            assert !F.isEmpty(pl) : pl;
-
-            ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
-
-            IntArray parts = res.get(n);
-
-            if (parts == null)
-                res.put(n, parts = new IntArray());
-
-            parts.add(p);
-        }
-
-        return res;
-    }
-
-    /**
      * @param c Connection.
      * @param qry Query.
      * @param params Query parameters.
@@ -1681,63 +1221,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * Evaluates nodes and nodes to partitions map given a list of cache ids, 
topology version and partitions.
-     *
-     * @param cacheIds Cache ids.
-     * @param topVer Topology version.
-     * @param parts Partitions array.
-     * @param isReplicatedOnly Allow only replicated caches.
-     * @param qryId Query ID.
-     * @return Result.
-     */
-    private NodesForPartitionsResult nodesForPartitions(List<Integer> 
cacheIds, AffinityTopologyVersion topVer,
-        int[] parts, boolean isReplicatedOnly, long qryId) {
-        Collection<ClusterNode> nodes = null;
-        Map<ClusterNode, IntArray> partsMap = null;
-        Map<ClusterNode, IntArray> qryMap = null;
-
-        for (int cacheId : cacheIds) {
-            GridCacheContext<?, ?> cctx = cacheContext(cacheId);
-
-            PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
-
-            if (plc != READ_ONLY_SAFE && plc != READ_WRITE_SAFE)
-                continue;
-
-            Collection<Integer> lostParts = cctx.topology().lostPartitions();
-
-            for (int part : lostParts) {
-                if (parts == null || Arrays.binarySearch(parts, part) >= 0) {
-                    throw new CacheException("Failed to execute query because 
cache partition has been " +
-                        "lost [cacheName=" + cctx.name() + ", part=" + part + 
']');
-                }
-            }
-        }
-
-        if (isPreloadingActive(cacheIds)) {
-            if (isReplicatedOnly)
-                nodes = replicatedUnstableDataNodes(cacheIds, qryId);
-            else {
-                partsMap = partitionedUnstableDataNodes(cacheIds, qryId);
-
-                if (partsMap != null) {
-                    qryMap = narrowForQuery(partsMap, parts);
-
-                    nodes = qryMap == null ? null : qryMap.keySet();
-                }
-            }
-        }
-        else {
-            qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, 
parts, qryId);
-
-            if (qryMap != null)
-                nodes = qryMap.keySet();
-        }
-
-        return new NodesForPartitionsResult(nodes, partsMap, qryMap);
-    }
-
-    /**
      * @param conn Connection.
      * @param qry Query.
      * @param explain Explain.
@@ -1881,32 +1364,6 @@ public class GridReduceQueryExecutor {
         }
     }
 
-    /** */
-    private Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, 
IntArray> partsMap, int[] parts) {
-        if (parts == null)
-            return partsMap;
-
-        Map<ClusterNode, IntArray> cp = U.newHashMap(partsMap.size());
-
-        for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) {
-            IntArray filtered = new IntArray(parts.length);
-
-            IntArray orig = entry.getValue();
-
-            for (int i = 0; i < orig.size(); i++) {
-                int p = orig.get(i);
-
-                if (Arrays.binarySearch(parts, p) >= 0)
-                    filtered.add(p);
-            }
-
-            if (filtered.size() > 0)
-                cp.put(entry.getKey(), filtered);
-        }
-
-        return cp.isEmpty() ? null : cp;
-    }
-
     /**
      * @param qryTimeout Query timeout.
      * @return Query retry timeout.
@@ -1917,79 +1374,4 @@ public class GridReduceQueryExecutor {
 
         return IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, 
DFLT_RETRY_TIMEOUT);
     }
-
-    /** */
-    private static class ExplicitPartitionsSpecializer implements 
C2<ClusterNode, Message, Message> {
-        /** Partitions map. */
-        private final Map<ClusterNode, IntArray> partsMap;
-
-        /**
-         * @param partsMap Partitions map.
-         */
-        public ExplicitPartitionsSpecializer(Map<ClusterNode, IntArray> 
partsMap) {
-            this.partsMap = partsMap;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Message apply(ClusterNode node, Message msg) {
-            if (msg instanceof GridH2QueryRequest) {
-                GridH2QueryRequest rq = new 
GridH2QueryRequest((GridH2QueryRequest)msg);
-
-                rq.queryPartitions(toArray(partsMap.get(node)));
-
-                return rq;
-            } else if (msg instanceof GridH2DmlRequest) {
-                GridH2DmlRequest rq = new 
GridH2DmlRequest((GridH2DmlRequest)msg);
-
-                rq.queryPartitions(toArray(partsMap.get(node)));
-
-                return rq;
-            }
-
-            return msg;
-        }
-    }
-
-    /**
-     * Result of nodes to partitions mapping for a query or update.
-     */
-    static class NodesForPartitionsResult {
-        /** */
-        final Collection<ClusterNode> nodes;
-
-        /** */
-        final Map<ClusterNode, IntArray> partsMap;
-
-        /** */
-        final Map<ClusterNode, IntArray> qryMap;
-
-        /** */
-        NodesForPartitionsResult(Collection<ClusterNode> nodes, 
Map<ClusterNode, IntArray> partsMap,
-            Map<ClusterNode, IntArray> qryMap) {
-            this.nodes = nodes;
-            this.partsMap = partsMap;
-            this.qryMap = qryMap;
-        }
-
-        /**
-         * @return Collection of nodes a message shall be sent to.
-         */
-        Collection<ClusterNode> nodes() {
-            return nodes;
-        }
-
-        /**
-         * @return Maps a node to partition array.
-         */
-        Map<ClusterNode, IntArray> partitionsMap() {
-            return partsMap;
-        }
-
-        /**
-         * @return Maps a node to partition array.
-         */
-        Map<ClusterNode, IntArray> queryPartitionsMap() {
-            return qryMap;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java
new file mode 100644
index 0000000..83bbf8e
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.h2.util.IntArray;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Result of nodes to partitions mapping for a query or update.
+ */
+public class ReducePartitionMapResult {
+    /** */
+    private final Collection<ClusterNode> nodes;
+
+    /** */
+    private final Map<ClusterNode, IntArray> partsMap;
+
+    /** */
+    private final Map<ClusterNode, IntArray> qryMap;
+
+    /**
+     * Constructor.
+     *
+     * @param nodes Nodes.
+     * @param partsMap Partitions map.
+     * @param qryMap Nodes map.
+     */
+    public ReducePartitionMapResult(Collection<ClusterNode> nodes, 
Map<ClusterNode, IntArray> partsMap,
+        Map<ClusterNode, IntArray> qryMap) {
+        this.nodes = nodes;
+        this.partsMap = partsMap;
+        this.qryMap = qryMap;
+    }
+
+    /**
+     * @return Collection of nodes a message shall be sent to.
+     */
+    public Collection<ClusterNode> nodes() {
+        return nodes;
+    }
+
+    /**
+     * @return Maps a node to partition array.
+     */
+    public Map<ClusterNode, IntArray> partitionsMap() {
+        return partsMap;
+    }
+
+    /**
+     * @return Maps a node to partition array.
+     */
+    public Map<ClusterNode, IntArray> queryPartitionsMap() {
+        return qryMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
new file mode 100644
index 0000000..1112d19
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.util.GridIntIterator;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.util.IntArray;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.CacheException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+
+/**
+ * Reduce partition mapper.
+ */
+public class ReducePartitionMapper {
+    /** */
+    private static final Set<ClusterNode> UNMAPPED_PARTS = 
Collections.emptySet();
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public ReducePartitionMapper(GridKernalContext ctx, IgniteLogger log) {
+        this.ctx = ctx;
+        this.log = log;
+    }
+
+    /**
+     * Evaluates nodes and nodes to partitions map given a list of cache ids, 
topology version and partitions.
+     *
+     * @param cacheIds Cache ids.
+     * @param topVer Topology version.
+     * @param parts Partitions array.
+     * @param isReplicatedOnly Allow only replicated caches.
+     * @param qryId Query ID.
+     * @return Result.
+     */
+    public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds, 
AffinityTopologyVersion topVer,
+        int[] parts, boolean isReplicatedOnly, long qryId) {
+        Collection<ClusterNode> nodes = null;
+        Map<ClusterNode, IntArray> partsMap = null;
+        Map<ClusterNode, IntArray> qryMap = null;
+
+        for (int cacheId : cacheIds) {
+            GridCacheContext<?, ?> cctx = cacheContext(cacheId);
+
+            PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+
+            if (plc != READ_ONLY_SAFE && plc != READ_WRITE_SAFE)
+                continue;
+
+            Collection<Integer> lostParts = cctx.topology().lostPartitions();
+
+            for (int part : lostParts) {
+                if (parts == null || Arrays.binarySearch(parts, part) >= 0) {
+                    throw new CacheException("Failed to execute query because 
cache partition has been " +
+                        "lost [cacheName=" + cctx.name() + ", part=" + part + 
']');
+                }
+            }
+        }
+
+        if (isPreloadingActive(cacheIds)) {
+            if (isReplicatedOnly)
+                nodes = replicatedUnstableDataNodes(cacheIds, qryId);
+            else {
+                partsMap = partitionedUnstableDataNodes(cacheIds, qryId);
+
+                if (partsMap != null) {
+                    qryMap = narrowForQuery(partsMap, parts);
+
+                    nodes = qryMap == null ? null : qryMap.keySet();
+                }
+            }
+        }
+        else {
+            qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, 
parts, qryId);
+
+            if (qryMap != null)
+                nodes = qryMap.keySet();
+        }
+
+        return new ReducePartitionMapResult(nodes, partsMap, qryMap);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Cache context.
+     */
+    private GridCacheContext<?,?> cacheContext(Integer cacheId) {
+        return ctx.cache().context().cacheContext(cacheId);
+    }
+
+    /**
+     * @param cacheIds Cache IDs.
+     * @return {@code true} If preloading is active.
+     */
+    private boolean isPreloadingActive(List<Integer> cacheIds) {
+        for (Integer cacheId : cacheIds) {
+            if (null == cacheContext(cacheId))
+                throw new CacheException(String.format("Cache not found on 
local node [cacheId=%d]", cacheId));
+
+            if (hasMovingPartitions(cacheContext(cacheId)))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @return {@code True} If cache has partitions in {@link 
GridDhtPartitionState#MOVING} state.
+     */
+    private static boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) {
+        assert cctx != null;
+
+        return !cctx.isLocal() && cctx.topology().hasMovingPartitions();
+    }
+
+    /**
+     * @param isReplicatedOnly If we must only have replicated caches.
+     * @param topVer Topology version.
+     * @param cacheIds Participating cache IDs.
+     * @param parts Partitions.
+     * @param qryId Query ID.
+     * @return Data nodes or {@code null} if repartitioning started and we 
need to retry.
+     */
+    private Map<ClusterNode, IntArray> stableDataNodes(boolean 
isReplicatedOnly, AffinityTopologyVersion topVer,
+        List<Integer> cacheIds, int[] parts, long qryId) {
+        GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0));
+
+        // If the first cache is not partitioned, find it (if it's present) 
and move it to index 0.
+        if (!cctx.isPartitioned()) {
+            for (int cacheId = 1; cacheId < cacheIds.size(); cacheId++) {
+                GridCacheContext<?, ?> currCctx = 
cacheContext(cacheIds.get(cacheId));
+
+                if (currCctx.isPartitioned()) {
+                    Collections.swap(cacheIds, 0, cacheId);
+
+                    cctx = currCctx;
+
+                    break;
+                }
+            }
+        }
+
+        Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, 
parts);
+
+        Set<ClusterNode> nodes = map.keySet();
+
+        if (F.isEmpty(map))
+            throw new CacheException("Failed to find data nodes for cache: " + 
cctx.name());
+
+        for (int i = 1; i < cacheIds.size(); i++) {
+            GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i));
+
+            String extraCacheName = extraCctx.name();
+
+            if (extraCctx.isLocal())
+                continue; // No consistency guaranties for local caches.
+
+            if (isReplicatedOnly && !extraCctx.isReplicated())
+                throw new CacheException("Queries running on replicated cache 
should not contain JOINs " +
+                    "with partitioned tables [replicatedCache=" + cctx.name() +
+                    ", partitionedCache=" + extraCacheName + "]");
+
+            Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, 
extraCctx, parts).keySet();
+
+            if (F.isEmpty(extraNodes))
+                throw new CacheException("Failed to find data nodes for cache: 
" + extraCacheName);
+
+            boolean disjoint;
+
+            if (extraCctx.isReplicated()) {
+                if (isReplicatedOnly) {
+                    nodes.retainAll(extraNodes);
+
+                    disjoint = map.isEmpty();
+                }
+                else
+                    disjoint = !extraNodes.containsAll(nodes);
+            }
+            else
+                disjoint = !extraNodes.equals(nodes);
+
+            if (disjoint) {
+                if (isPreloadingActive(cacheIds)) {
+                    logRetry("Failed to calculate nodes for SQL query (got 
disjoint node map during rebalance) " +
+                        "[qryId=" + qryId + ", affTopVer=" + topVer + ", 
cacheIds=" + cacheIds +
+                        ", parts=" + (parts == null ? "[]" : 
Arrays.toString(parts)) +
+                        ", replicatedOnly=" + isReplicatedOnly + ", 
lastCache=" + extraCctx.name() +
+                        ", lastCacheId=" + extraCctx.cacheId() + ']');
+
+                    return null; // Retry.
+                }
+                else
+                    throw new CacheException("Caches have distinct sets of 
data nodes [cache1=" + cctx.name() +
+                        ", cache2=" + extraCacheName + "]");
+            }
+        }
+
+        return map;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param cctx Cache context.
+     * @param parts Partitions.
+     */
+    private Map<ClusterNode, IntArray> 
stableDataNodesMap(AffinityTopologyVersion topVer,
+        final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) {
+
+        Map<ClusterNode, IntArray> mapping = new HashMap<>();
+
+        // Explicit partitions mapping is not applicable to replicated cache.
+        if (cctx.isReplicated()) {
+            for (ClusterNode clusterNode : 
cctx.affinity().assignment(topVer).nodes())
+                mapping.put(clusterNode, null);
+
+            return mapping;
+        }
+
+        List<List<ClusterNode>> assignment = 
cctx.affinity().assignment(topVer).assignment();
+
+        boolean needPartsFilter = parts != null;
+
+        GridIntIterator iter = needPartsFilter ? new 
GridIntList(parts).iterator() :
+            U.forRange(0, cctx.affinity().partitions());
+
+        while(iter.hasNext()) {
+            int partId = iter.next();
+
+            List<ClusterNode> partNodes = assignment.get(partId);
+
+            if (!partNodes.isEmpty()) {
+                ClusterNode prim = partNodes.get(0);
+
+                if (!needPartsFilter) {
+                    mapping.put(prim, null);
+
+                    continue;
+                }
+
+                IntArray partIds = mapping.get(prim);
+
+                if (partIds == null) {
+                    partIds = new IntArray();
+
+                    mapping.put(prim, partIds);
+                }
+
+                partIds.add(partId);
+            }
+        }
+
+        return mapping;
+    }
+
+    /**
+     * Calculates partition mapping for partitioned cache on unstable topology.
+     *
+     * @param cacheIds Cache IDs.
+     * @param qryId Query ID.
+     * @return Partition mapping or {@code null} if we can't calculate it due 
to repartitioning and we need to retry.
+     */
+    @SuppressWarnings("unchecked")
+    private Map<ClusterNode, IntArray> 
partitionedUnstableDataNodes(List<Integer> cacheIds, long qryId) {
+        // If the main cache is replicated, just replace it with the first 
partitioned.
+        GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds);
+
+        final int partsCnt = cctx.affinity().partitions();
+
+        if (cacheIds.size() > 1) { // Check correct number of partitions for 
partitioned caches.
+            for (Integer cacheId : cacheIds) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
+
+                if (extraCctx.isReplicated() || extraCctx.isLocal())
+                    continue;
+
+                int parts = extraCctx.affinity().partitions();
+
+                if (parts != partsCnt)
+                    throw new CacheException("Number of partitions must be the 
same for correct collocation [cache1=" +
+                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + 
extraCctx.name() +
+                        ", parts2=" + parts + "]");
+            }
+        }
+
+        Set<ClusterNode>[] partLocs = new Set[partsCnt];
+
+        // Fill partition locations for main cache.
+        for (int p = 0; p < partsCnt; p++) {
+            List<ClusterNode> owners = cctx.topology().owners(p);
+
+            if (F.isEmpty(owners)) {
+                // Handle special case: no mapping is configured for a 
partition.
+                if (F.isEmpty(cctx.affinity().assignment(NONE).get(p))) {
+                    partLocs[p] = UNMAPPED_PARTS; // Mark unmapped partition.
+
+                    continue;
+                }
+                else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) {
+                    logRetry("Failed to calculate nodes for SQL query 
(partition has no owners, but corresponding " +
+                        "cache group has data nodes) [qryId=" + qryId + ", 
cacheIds=" + cacheIds +
+                        ", cacheName=" + cctx.name() + ", cacheId=" + 
cctx.cacheId() + ", part=" + p +
+                        ", cacheGroupId=" + cctx.groupId() + ']');
+
+                    return null; // Retry.
+                }
+
+                throw new CacheException("Failed to find data nodes [cache=" + 
cctx.name() + ", part=" + p + "]");
+            }
+
+            partLocs[p] = new HashSet<>(owners);
+        }
+
+        if (cacheIds.size() > 1) {
+            // Find owner intersections for each participating partitioned 
cache partition.
+            // We need this for logical collocation between different 
partitioned caches with the same affinity.
+            for (Integer cacheId : cacheIds) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
+
+                // This is possible if we have replaced a replicated cache 
with a partitioned one earlier.
+                if (cctx == extraCctx)
+                    continue;
+
+                if (extraCctx.isReplicated() || extraCctx.isLocal())
+                    continue;
+
+                for (int p = 0, parts = extraCctx.affinity().partitions(); p < 
parts; p++) {
+                    List<ClusterNode> owners = extraCctx.topology().owners(p);
+
+                    if (partLocs[p] == UNMAPPED_PARTS)
+                        continue; // Skip unmapped partitions.
+
+                    if (F.isEmpty(owners)) {
+                        if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) {
+                            logRetry("Failed to calculate nodes for SQL query 
(partition has no owners, but " +
+                                "corresponding cache group has data nodes) 
[qryId=" + qryId +
+                                ", cacheIds=" + cacheIds + ", cacheName=" + 
extraCctx.name() +
+                                ", cacheId=" + extraCctx.cacheId() + ", part=" 
+ p +
+                                ", cacheGroupId=" + extraCctx.groupId() + ']');
+
+                            return null; // Retry.
+                        }
+
+                        throw new CacheException("Failed to find data nodes 
[cache=" + extraCctx.name() +
+                            ", part=" + p + "]");
+                    }
+
+                    if (partLocs[p] == null)
+                        partLocs[p] = new HashSet<>(owners);
+                    else {
+                        partLocs[p].retainAll(owners); // Intersection of 
owners.
+
+                        if (partLocs[p].isEmpty()) {
+                            logRetry("Failed to calculate nodes for SQL query 
(caches have no common data nodes for " +
+                                "partition) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
+                                ", lastCacheName=" + extraCctx.name() + ", 
lastCacheId=" + extraCctx.cacheId() +
+                                ", part=" + p + ']');
+
+                            return null; // Intersection is empty -> retry.
+                        }
+                    }
+                }
+            }
+
+            // Filter nodes where not all the replicated caches loaded.
+            for (Integer cacheId : cacheIds) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
+
+                if (!extraCctx.isReplicated())
+                    continue;
+
+                Set<ClusterNode> dataNodes = 
replicatedUnstableDataNodes(extraCctx, qryId);
+
+                if (F.isEmpty(dataNodes))
+                    return null; // Retry.
+
+                int part = 0;
+
+                for (Set<ClusterNode> partLoc : partLocs) {
+                    if (partLoc == UNMAPPED_PARTS)
+                        continue; // Skip unmapped partition.
+
+                    partLoc.retainAll(dataNodes);
+
+                    if (partLoc.isEmpty()) {
+                        logRetry("Failed to calculate nodes for SQL query 
(caches have no common data nodes for " +
+                            "partition) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
+                            ", lastReplicatedCacheName=" + extraCctx.name() +
+                            ", lastReplicatedCacheId=" + extraCctx.cacheId() + 
", part=" + part + ']');
+
+                        return null; // Retry.
+                    }
+
+                    part++;
+                }
+            }
+        }
+
+        // Collect the final partitions mapping.
+        Map<ClusterNode, IntArray> res = new HashMap<>();
+
+        // Here partitions in all IntArray's will be sorted in ascending 
order, this is important.
+        for (int p = 0; p < partLocs.length; p++) {
+            Set<ClusterNode> pl = partLocs[p];
+
+            // Skip unmapped partitions.
+            if (pl == UNMAPPED_PARTS)
+                continue;
+
+            assert !F.isEmpty(pl) : pl;
+
+            ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
+
+            IntArray parts = res.get(n);
+
+            if (parts == null)
+                res.put(n, parts = new IntArray());
+
+            parts.add(p);
+        }
+
+        return res;
+    }
+
+    /**
+     * Calculates data nodes for replicated caches on unstable topology.
+     *
+     * @param cacheIds Cache IDs.
+     * @param qryId Query ID.
+     * @return Collection of all data nodes owning all the caches or {@code 
null} for retry.
+     */
+    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> 
cacheIds, long qryId) {
+        int i = 0;
+
+        GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++));
+
+        // The main cache is allowed to be partitioned.
+        if (!cctx.isReplicated()) {
+            assert cacheIds.size() > 1: "no extra replicated caches with 
partitioned main cache";
+
+            // Just replace the main cache with the first one extra.
+            cctx = cacheContext(cacheIds.get(i++));
+
+            assert cctx.isReplicated(): "all the extra caches must be 
replicated here";
+        }
+
+        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx, qryId);
+
+        if (F.isEmpty(nodes))
+            return null; // Retry.
+
+        for (;i < cacheIds.size(); i++) {
+            GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i));
+
+            if (extraCctx.isLocal())
+                continue;
+
+            if (!extraCctx.isReplicated())
+                throw new CacheException("Queries running on replicated cache 
should not contain JOINs " +
+                    "with tables in partitioned caches [replicatedCache=" + 
cctx.name() + ", " +
+                    "partitionedCache=" + extraCctx.name() + "]");
+
+            Set<ClusterNode> extraOwners = 
replicatedUnstableDataNodes(extraCctx, qryId);
+
+            if (F.isEmpty(extraOwners))
+                return null; // Retry.
+
+            nodes.retainAll(extraOwners);
+
+            if (nodes.isEmpty()) {
+                logRetry("Failed to calculate nodes for SQL query (got 
disjoint node map for REPLICATED caches " +
+                    "during rebalance) [qryId=" + qryId + ", cacheIds=" + 
cacheIds +
+                    ", lastCache=" + extraCctx.name() + ", lastCacheId=" + 
extraCctx.cacheId() + ']');
+
+                return null; // Retry.
+            }
+        }
+
+        return nodes;
+    }
+
+    /**
+     * Collects all the nodes owning all the partitions for the given 
replicated cache.
+     *
+     * @param cctx Cache context.
+     * @param qryId Query ID.
+     * @return Owning nodes or {@code null} if we can't find owners for some 
partitions.
+     */
+    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> 
cctx, long qryId) {
+        assert cctx.isReplicated() : cctx.name() + " must be replicated";
+
+        String cacheName = cctx.name();
+
+        Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(cctx.groupId(), 
NONE));
+
+        if (dataNodes.isEmpty())
+            throw new CacheException("Failed to find data nodes for cache: " + 
cacheName);
+
+        // Find all the nodes owning all the partitions for replicated cache.
+        for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
+            List<ClusterNode> owners = cctx.topology().owners(p);
+
+            if (F.isEmpty(owners)) {
+                logRetry("Failed to calculate nodes for SQL query (partition 
of a REPLICATED cache has no owners) [" +
+                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", 
cacheId=" + cctx.cacheId() +
+                    ", part=" + p + ']');
+
+                return null; // Retry.
+            }
+
+            dataNodes.retainAll(owners);
+
+            if (dataNodes.isEmpty()) {
+                logRetry("Failed to calculate nodes for SQL query (partitions 
of a REPLICATED has no common owners) [" +
+                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", 
cacheId=" + cctx.cacheId() +
+                    ", lastPart=" + p + ']');
+
+                return null; // Retry.
+            }
+        }
+
+        return dataNodes;
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param topVer Topology version.
+     * @return Collection of data nodes.
+     */
+    private Collection<ClusterNode> dataNodes(int grpId, 
AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> res = 
ctx.discovery().cacheGroupAffinityNodes(grpId, topVer);
+
+        return res != null ? res : Collections.<ClusterNode>emptySet();
+    }
+
+    /**
+     *
+     * @param partsMap Partitions map.
+     * @param parts Partitions.
+     * @return Result.
+     */
+    private static Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, 
IntArray> partsMap, int[] parts) {
+        if (parts == null)
+            return partsMap;
+
+        Map<ClusterNode, IntArray> cp = U.newHashMap(partsMap.size());
+
+        for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) {
+            IntArray filtered = new IntArray(parts.length);
+
+            IntArray orig = entry.getValue();
+
+            for (int i = 0; i < orig.size(); i++) {
+                int p = orig.get(i);
+
+                if (Arrays.binarySearch(parts, p) >= 0)
+                    filtered.add(p);
+            }
+
+            if (filtered.size() > 0)
+                cp.put(entry.getKey(), filtered);
+        }
+
+        return cp.isEmpty() ? null : cp;
+    }
+
+    /**
+     * @param cacheIds Cache IDs.
+     * @return The first partitioned cache context.
+     */
+    private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) 
{
+        for (int i = 0; i < cacheIds.size(); i++) {
+            GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
+
+            if (i == 0 && cctx.isLocal())
+                throw new CacheException("Cache is LOCAL: " + cctx.name());
+
+            if (!cctx.isReplicated() && !cctx.isLocal())
+                return cctx;
+        }
+
+        throw new IllegalStateException("Failed to find partitioned cache.");
+    }
+
+    /**
+     * Load failed partition reservation.
+     *
+     * @param msg Message.
+     */
+    private void logRetry(String msg) {
+        log.info(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java
new file mode 100644
index 0000000..592efee
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
+import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.h2.util.IntArray;
+
+import java.util.Map;
+
+/**
+ * Reducer partitions specializer.
+ */
+public class ReducePartitionsSpecializer implements C2<ClusterNode, Message, 
Message> {
+    /** Partitions map. */
+    private final Map<ClusterNode, IntArray> partsMap;
+
+    /**
+     * @param partsMap Partitions map.
+     */
+    public ReducePartitionsSpecializer(Map<ClusterNode, IntArray> partsMap) {
+        this.partsMap = partsMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Message apply(ClusterNode node, Message msg) {
+        if (msg instanceof GridH2QueryRequest) {
+            GridH2QueryRequest rq = new 
GridH2QueryRequest((GridH2QueryRequest)msg);
+
+            
rq.queryPartitions(GridReduceQueryExecutor.toArray(partsMap.get(node)));
+
+            return rq;
+        } else if (msg instanceof GridH2DmlRequest) {
+            GridH2DmlRequest rq = new GridH2DmlRequest((GridH2DmlRequest)msg);
+
+            
rq.queryPartitions(GridReduceQueryExecutor.toArray(partsMap.get(node)));
+
+            return rq;
+        }
+
+        return msg;
+    }
+}

Reply via email to