Repository: ignite Updated Branches: refs/heads/master bf75ef22d -> 6613376d2
IGNITE-10535: SQL: moved reducer partition resolution logic to separate classes. This closes #5575. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6613376d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6613376d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6613376d Branch: refs/heads/master Commit: 6613376d2cdd9bbe68dd7b3167cfd5c16a39f67e Parents: bf75ef2 Author: devozerov <voze...@gridgain.com> Authored: Thu Dec 6 17:47:50 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Thu Dec 6 17:47:50 2018 +0300 ---------------------------------------------------------------------- .../h2/twostep/GridReduceQueryExecutor.java | 648 +------------------ .../h2/twostep/ReducePartitionMapResult.java | 73 +++ .../query/h2/twostep/ReducePartitionMapper.java | 638 ++++++++++++++++++ .../h2/twostep/ReducePartitionsSpecializer.java | 61 ++ 4 files changed, 787 insertions(+), 633 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 36287b3..905a4a3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -22,16 +22,12 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -47,7 +43,6 @@ import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; @@ -61,7 +56,6 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture; import org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVersionFuture; @@ -89,8 +83,6 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlReque import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2SelectForUpdateTxDetails; -import org.apache.ignite.internal.util.GridIntIterator; -import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CIX2; @@ -113,9 +105,6 @@ import org.jetbrains.annotations.Nullable; import static java.util.Collections.singletonList; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; -import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx; @@ -138,9 +127,6 @@ public class GridReduceQueryExecutor { private static final String MERGE_INDEX_SORTED = "merge_sorted"; /** */ - private static final Set<ClusterNode> UNMAPPED_PARTS = Collections.emptySet(); - - /** */ private GridKernalContext ctx; /** */ @@ -174,7 +160,12 @@ public class GridReduceQueryExecutor { } }; + /** Partition mapper. */ + private ReducePartitionMapper mapper; + /** + * Constructor. + * * @param qryIdGen Query ID generator. * @param busyLock Busy lock. */ @@ -194,6 +185,8 @@ public class GridReduceQueryExecutor { log = ctx.log(GridReduceQueryExecutor.class); + mapper = new ReducePartitionMapper(ctx, log); + ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { @SuppressWarnings("deprecation") @Override public void onMessage(UUID nodeId, Object msg, byte plc) { @@ -370,32 +363,6 @@ public class GridReduceQueryExecutor { } /** - * @param cacheIds Cache IDs. - * @return {@code true} If preloading is active. - */ - private boolean isPreloadingActive(List<Integer> cacheIds) { - for (Integer cacheId : cacheIds) { - if (null == cacheContext(cacheId)) - throw new CacheException(String.format("Cache not found on local node [cacheId=%d]", cacheId)); - - if (hasMovingPartitions(cacheContext(cacheId))) - return true; - } - - return false; - } - - /** - * @param cctx Cache context. - * @return {@code True} If cache has partitions in {@link GridDhtPartitionState#MOVING} state. - */ - private boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) { - assert cctx != null; - - return !cctx.isLocal() && cctx.topology().hasMovingPartitions(); - } - - /** * @param cacheId Cache ID. * @return Cache context. */ @@ -404,154 +371,6 @@ public class GridReduceQueryExecutor { } /** - * @param topVer Topology version. - * @param cctx Cache context. - * @param parts Partitions. - */ - private Map<ClusterNode, IntArray> stableDataNodesMap(AffinityTopologyVersion topVer, - final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) { - - Map<ClusterNode, IntArray> mapping = new HashMap<>(); - - // Explicit partitions mapping is not applicable to replicated cache. - if (cctx.isReplicated()) { - for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).nodes()) - mapping.put(clusterNode, null); - - return mapping; - } - - List<List<ClusterNode>> assignment = cctx.affinity().assignment(topVer).assignment(); - - boolean needPartsFilter = parts != null; - - GridIntIterator iter = needPartsFilter ? new GridIntList(parts).iterator() : - U.forRange(0, cctx.affinity().partitions()); - - while(iter.hasNext()) { - int partId = iter.next(); - - List<ClusterNode> partNodes = assignment.get(partId); - - if (!partNodes.isEmpty()) { - ClusterNode prim = partNodes.get(0); - - if (!needPartsFilter) { - mapping.put(prim, null); - - continue; - } - - IntArray partIds = mapping.get(prim); - - if (partIds == null) { - partIds = new IntArray(); - - mapping.put(prim, partIds); - } - - partIds.add(partId); - } - } - - return mapping; - } - - /** - * Load failed partition reservation. - * - * @param msg Message. - */ - private void logRetry(String msg) { - log.info(msg); - } - - /** - * @param isReplicatedOnly If we must only have replicated caches. - * @param topVer Topology version. - * @param cacheIds Participating cache IDs. - * @param parts Partitions. - * @param qryId Query ID. - * @return Data nodes or {@code null} if repartitioning started and we need to retry. - */ - private Map<ClusterNode, IntArray> stableDataNodes(boolean isReplicatedOnly, AffinityTopologyVersion topVer, - List<Integer> cacheIds, int[] parts, long qryId) { - GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0)); - - // If the first cache is not partitioned, find it (if it's present) and move it to index 0. - if (!cctx.isPartitioned()) { - for (int cacheId = 1; cacheId < cacheIds.size(); cacheId++) { - GridCacheContext<?, ?> currCctx = cacheContext(cacheIds.get(cacheId)); - - if (currCctx.isPartitioned()) { - Collections.swap(cacheIds, 0, cacheId); - - cctx = currCctx; - - break; - } - } - } - - Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts); - - Set<ClusterNode> nodes = map.keySet(); - - if (F.isEmpty(map)) - throw new CacheException("Failed to find data nodes for cache: " + cctx.name()); - - for (int i = 1; i < cacheIds.size(); i++) { - GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i)); - - String extraCacheName = extraCctx.name(); - - if (extraCctx.isLocal()) - continue; // No consistency guaranties for local caches. - - if (isReplicatedOnly && !extraCctx.isReplicated()) - throw new CacheException("Queries running on replicated cache should not contain JOINs " + - "with partitioned tables [replicatedCache=" + cctx.name() + - ", partitionedCache=" + extraCacheName + "]"); - - Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet(); - - if (F.isEmpty(extraNodes)) - throw new CacheException("Failed to find data nodes for cache: " + extraCacheName); - - boolean disjoint; - - if (extraCctx.isReplicated()) { - if (isReplicatedOnly) { - nodes.retainAll(extraNodes); - - disjoint = map.isEmpty(); - } - else - disjoint = !extraNodes.containsAll(nodes); - } - else - disjoint = !extraNodes.equals(nodes); - - if (disjoint) { - if (isPreloadingActive(cacheIds)) { - logRetry("Failed to calculate nodes for SQL query (got disjoint node map during rebalance) " + - "[qryId=" + qryId + ", affTopVer=" + topVer + ", cacheIds=" + cacheIds + - ", parts=" + (parts == null ? "[]" : Arrays.toString(parts)) + - ", replicatedOnly=" + isReplicatedOnly + ", lastCache=" + extraCctx.name() + - ", lastCacheId=" + extraCctx.cacheId() + ']'); - - return null; // Retry. - } - else - throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + - ", cache2=" + extraCacheName + "]"); - } - } - - return map; - } - - /** * @param schemaName Schema name. * @param qry Query. * @param keepBinary Keep binary. @@ -686,8 +505,8 @@ public class GridReduceQueryExecutor { if (qry.isLocal()) nodes = singletonList(ctx.discovery().localNode()); else { - NodesForPartitionsResult nodesParts = - nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryReqId); + ReducePartitionMapResult nodesParts = + mapper.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryReqId); nodes = nodesParts.nodes(); partsMap = nodesParts.partitionsMap(); @@ -843,7 +662,7 @@ public class GridReduceQueryExecutor { req.mvccSnapshot(mvccTracker.snapshot()); final C2<ClusterNode, Message, Message> pspec = - (parts == null ? null : new ExplicitPartitionsSpecializer(qryMap)); + (parts == null ? null : new ReducePartitionsSpecializer(qryMap)); final C2<ClusterNode, Message, Message> spec; @@ -905,7 +724,7 @@ public class GridReduceQueryExecutor { else // Send failed. retry = true; - Iterator<List<?>> resIter = null; + Iterator<List<?>> resIter; if (!retry) { if (skipMergeTbl) { @@ -1050,7 +869,8 @@ public class GridReduceQueryExecutor { final long reqId = qryIdGen.incrementAndGet(); - NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, reqId); + ReducePartitionMapResult nodesParts = + mapper.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, reqId); final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, selectQry, GridCacheQueryType.SQL_FIELDS, schemaName, U.currentTimeMillis(), cancel, false); @@ -1105,8 +925,8 @@ public class GridReduceQueryExecutor { Map<ClusterNode, IntArray> partsMap = (nodesParts.queryPartitionsMap() != null) ? nodesParts.queryPartitionsMap() : nodesParts.partitionsMap(); - ExplicitPartitionsSpecializer partsSpec = (parts == null) ? null : - new ExplicitPartitionsSpecializer(partsMap); + ReducePartitionsSpecializer partsSpec = (parts == null) ? null : + new ReducePartitionsSpecializer(partsMap); final Collection<ClusterNode> finalNodes = nodes; @@ -1290,286 +1110,6 @@ public class GridReduceQueryExecutor { } /** - * Calculates data nodes for replicated caches on unstable topology. - * - * @param cacheIds Cache IDs. - * @param qryId Query ID. - * @return Collection of all data nodes owning all the caches or {@code null} for retry. - */ - private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds, long qryId) { - int i = 0; - - GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++)); - - // The main cache is allowed to be partitioned. - if (!cctx.isReplicated()) { - assert cacheIds.size() > 1: "no extra replicated caches with partitioned main cache"; - - // Just replace the main cache with the first one extra. - cctx = cacheContext(cacheIds.get(i++)); - - assert cctx.isReplicated(): "all the extra caches must be replicated here"; - } - - Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx, qryId); - - if (F.isEmpty(nodes)) - return null; // Retry. - - for (;i < cacheIds.size(); i++) { - GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i)); - - if (extraCctx.isLocal()) - continue; - - if (!extraCctx.isReplicated()) - throw new CacheException("Queries running on replicated cache should not contain JOINs " + - "with tables in partitioned caches [replicatedCache=" + cctx.name() + ", " + - "partitionedCache=" + extraCctx.name() + "]"); - - Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx, qryId); - - if (F.isEmpty(extraOwners)) - return null; // Retry. - - nodes.retainAll(extraOwners); - - if (nodes.isEmpty()) { - logRetry("Failed to calculate nodes for SQL query (got disjoint node map for REPLICATED caches " + - "during rebalance) [qryId=" + qryId + ", cacheIds=" + cacheIds + - ", lastCache=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + ']'); - - return null; // Retry. - } - } - - return nodes; - } - - /** - * @param grpId Cache group ID. - * @param topVer Topology version. - * @return Collection of data nodes. - */ - private Collection<ClusterNode> dataNodes(int grpId, AffinityTopologyVersion topVer) { - Collection<ClusterNode> res = ctx.discovery().cacheGroupAffinityNodes(grpId, topVer); - - return res != null ? res : Collections.<ClusterNode>emptySet(); - } - - /** - * Collects all the nodes owning all the partitions for the given replicated cache. - * - * @param cctx Cache context. - * @param qryId Query ID. - * @return Owning nodes or {@code null} if we can't find owners for some partitions. - */ - private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx, long qryId) { - assert cctx.isReplicated() : cctx.name() + " must be replicated"; - - String cacheName = cctx.name(); - - Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(cctx.groupId(), NONE)); - - if (dataNodes.isEmpty()) - throw new CacheException("Failed to find data nodes for cache: " + cacheName); - - // Find all the nodes owning all the partitions for replicated cache. - for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { - List<ClusterNode> owners = cctx.topology().owners(p); - - if (F.isEmpty(owners)) { - logRetry("Failed to calculate nodes for SQL query (partition of a REPLICATED cache has no owners) [" + - "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + - ", part=" + p + ']'); - - return null; // Retry. - } - - dataNodes.retainAll(owners); - - if (dataNodes.isEmpty()) { - logRetry("Failed to calculate nodes for SQL query (partitions of a REPLICATED has no common owners) [" + - "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + - ", lastPart=" + p + ']'); - - return null; // Retry. - } - } - - return dataNodes; - } - - /** - * Calculates partition mapping for partitioned cache on unstable topology. - * - * @param cacheIds Cache IDs. - * @param qryId Query ID. - * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. - */ - @SuppressWarnings("unchecked") - private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds, long qryId) { - // If the main cache is replicated, just replace it with the first partitioned. - GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds); - - final int partsCnt = cctx.affinity().partitions(); - - if (cacheIds.size() > 1) { // Check correct number of partitions for partitioned caches. - for (Integer cacheId : cacheIds) { - GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); - - if (extraCctx.isReplicated() || extraCctx.isLocal()) - continue; - - int parts = extraCctx.affinity().partitions(); - - if (parts != partsCnt) - throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" + - cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + - ", parts2=" + parts + "]"); - } - } - - Set<ClusterNode>[] partLocs = new Set[partsCnt]; - - // Fill partition locations for main cache. - for (int p = 0; p < partsCnt; p++) { - List<ClusterNode> owners = cctx.topology().owners(p); - - if (F.isEmpty(owners)) { - // Handle special case: no mapping is configured for a partition. - if (F.isEmpty(cctx.affinity().assignment(NONE).get(p))) { - partLocs[p] = UNMAPPED_PARTS; // Mark unmapped partition. - - continue; - } - else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) { - logRetry("Failed to calculate nodes for SQL query (partition has no owners, but corresponding " + - "cache group has data nodes) [qryId=" + qryId + ", cacheIds=" + cacheIds + - ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + ", part=" + p + - ", cacheGroupId=" + cctx.groupId() + ']'); - - return null; // Retry. - } - - throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]"); - } - - partLocs[p] = new HashSet<>(owners); - } - - if (cacheIds.size() > 1) { - // Find owner intersections for each participating partitioned cache partition. - // We need this for logical collocation between different partitioned caches with the same affinity. - for (Integer cacheId : cacheIds) { - GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); - - // This is possible if we have replaced a replicated cache with a partitioned one earlier. - if (cctx == extraCctx) - continue; - - if (extraCctx.isReplicated() || extraCctx.isLocal()) - continue; - - for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { - List<ClusterNode> owners = extraCctx.topology().owners(p); - - if (partLocs[p] == UNMAPPED_PARTS) - continue; // Skip unmapped partitions. - - if (F.isEmpty(owners)) { - if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) { - logRetry("Failed to calculate nodes for SQL query (partition has no owners, but " + - "corresponding cache group has data nodes) [qryId=" + qryId + - ", cacheIds=" + cacheIds + ", cacheName=" + extraCctx.name() + - ", cacheId=" + extraCctx.cacheId() + ", part=" + p + - ", cacheGroupId=" + extraCctx.groupId() + ']'); - - return null; // Retry. - } - - throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + - ", part=" + p + "]"); - } - - if (partLocs[p] == null) - partLocs[p] = new HashSet<>(owners); - else { - partLocs[p].retainAll(owners); // Intersection of owners. - - if (partLocs[p].isEmpty()) { - logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " + - "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds + - ", lastCacheName=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + - ", part=" + p + ']'); - - return null; // Intersection is empty -> retry. - } - } - } - } - - // Filter nodes where not all the replicated caches loaded. - for (Integer cacheId : cacheIds) { - GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); - - if (!extraCctx.isReplicated()) - continue; - - Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx, qryId); - - if (F.isEmpty(dataNodes)) - return null; // Retry. - - int part = 0; - - for (Set<ClusterNode> partLoc : partLocs) { - if (partLoc == UNMAPPED_PARTS) - continue; // Skip unmapped partition. - - partLoc.retainAll(dataNodes); - - if (partLoc.isEmpty()) { - logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " + - "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds + - ", lastReplicatedCacheName=" + extraCctx.name() + - ", lastReplicatedCacheId=" + extraCctx.cacheId() + ", part=" + part + ']'); - - return null; // Retry. - } - - part++; - } - } - } - - // Collect the final partitions mapping. - Map<ClusterNode, IntArray> res = new HashMap<>(); - - // Here partitions in all IntArray's will be sorted in ascending order, this is important. - for (int p = 0; p < partLocs.length; p++) { - Set<ClusterNode> pl = partLocs[p]; - - // Skip unmapped partitions. - if (pl == UNMAPPED_PARTS) - continue; - - assert !F.isEmpty(pl) : pl; - - ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl); - - IntArray parts = res.get(n); - - if (parts == null) - res.put(n, parts = new IntArray()); - - parts.add(p); - } - - return res; - } - - /** * @param c Connection. * @param qry Query. * @param params Query parameters. @@ -1681,63 +1221,6 @@ public class GridReduceQueryExecutor { } /** - * Evaluates nodes and nodes to partitions map given a list of cache ids, topology version and partitions. - * - * @param cacheIds Cache ids. - * @param topVer Topology version. - * @param parts Partitions array. - * @param isReplicatedOnly Allow only replicated caches. - * @param qryId Query ID. - * @return Result. - */ - private NodesForPartitionsResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer, - int[] parts, boolean isReplicatedOnly, long qryId) { - Collection<ClusterNode> nodes = null; - Map<ClusterNode, IntArray> partsMap = null; - Map<ClusterNode, IntArray> qryMap = null; - - for (int cacheId : cacheIds) { - GridCacheContext<?, ?> cctx = cacheContext(cacheId); - - PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy(); - - if (plc != READ_ONLY_SAFE && plc != READ_WRITE_SAFE) - continue; - - Collection<Integer> lostParts = cctx.topology().lostPartitions(); - - for (int part : lostParts) { - if (parts == null || Arrays.binarySearch(parts, part) >= 0) { - throw new CacheException("Failed to execute query because cache partition has been " + - "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); - } - } - } - - if (isPreloadingActive(cacheIds)) { - if (isReplicatedOnly) - nodes = replicatedUnstableDataNodes(cacheIds, qryId); - else { - partsMap = partitionedUnstableDataNodes(cacheIds, qryId); - - if (partsMap != null) { - qryMap = narrowForQuery(partsMap, parts); - - nodes = qryMap == null ? null : qryMap.keySet(); - } - } - } - else { - qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts, qryId); - - if (qryMap != null) - nodes = qryMap.keySet(); - } - - return new NodesForPartitionsResult(nodes, partsMap, qryMap); - } - - /** * @param conn Connection. * @param qry Query. * @param explain Explain. @@ -1881,32 +1364,6 @@ public class GridReduceQueryExecutor { } } - /** */ - private Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, IntArray> partsMap, int[] parts) { - if (parts == null) - return partsMap; - - Map<ClusterNode, IntArray> cp = U.newHashMap(partsMap.size()); - - for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) { - IntArray filtered = new IntArray(parts.length); - - IntArray orig = entry.getValue(); - - for (int i = 0; i < orig.size(); i++) { - int p = orig.get(i); - - if (Arrays.binarySearch(parts, p) >= 0) - filtered.add(p); - } - - if (filtered.size() > 0) - cp.put(entry.getKey(), filtered); - } - - return cp.isEmpty() ? null : cp; - } - /** * @param qryTimeout Query timeout. * @return Query retry timeout. @@ -1917,79 +1374,4 @@ public class GridReduceQueryExecutor { return IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT); } - - /** */ - private static class ExplicitPartitionsSpecializer implements C2<ClusterNode, Message, Message> { - /** Partitions map. */ - private final Map<ClusterNode, IntArray> partsMap; - - /** - * @param partsMap Partitions map. - */ - public ExplicitPartitionsSpecializer(Map<ClusterNode, IntArray> partsMap) { - this.partsMap = partsMap; - } - - /** {@inheritDoc} */ - @Override public Message apply(ClusterNode node, Message msg) { - if (msg instanceof GridH2QueryRequest) { - GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg); - - rq.queryPartitions(toArray(partsMap.get(node))); - - return rq; - } else if (msg instanceof GridH2DmlRequest) { - GridH2DmlRequest rq = new GridH2DmlRequest((GridH2DmlRequest)msg); - - rq.queryPartitions(toArray(partsMap.get(node))); - - return rq; - } - - return msg; - } - } - - /** - * Result of nodes to partitions mapping for a query or update. - */ - static class NodesForPartitionsResult { - /** */ - final Collection<ClusterNode> nodes; - - /** */ - final Map<ClusterNode, IntArray> partsMap; - - /** */ - final Map<ClusterNode, IntArray> qryMap; - - /** */ - NodesForPartitionsResult(Collection<ClusterNode> nodes, Map<ClusterNode, IntArray> partsMap, - Map<ClusterNode, IntArray> qryMap) { - this.nodes = nodes; - this.partsMap = partsMap; - this.qryMap = qryMap; - } - - /** - * @return Collection of nodes a message shall be sent to. - */ - Collection<ClusterNode> nodes() { - return nodes; - } - - /** - * @return Maps a node to partition array. - */ - Map<ClusterNode, IntArray> partitionsMap() { - return partsMap; - } - - /** - * @return Maps a node to partition array. - */ - Map<ClusterNode, IntArray> queryPartitionsMap() { - return qryMap; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java new file mode 100644 index 0000000..83bbf8e --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.cluster.ClusterNode; +import org.h2.util.IntArray; + +import java.util.Collection; +import java.util.Map; + +/** + * Result of nodes to partitions mapping for a query or update. + */ +public class ReducePartitionMapResult { + /** */ + private final Collection<ClusterNode> nodes; + + /** */ + private final Map<ClusterNode, IntArray> partsMap; + + /** */ + private final Map<ClusterNode, IntArray> qryMap; + + /** + * Constructor. + * + * @param nodes Nodes. + * @param partsMap Partitions map. + * @param qryMap Nodes map. + */ + public ReducePartitionMapResult(Collection<ClusterNode> nodes, Map<ClusterNode, IntArray> partsMap, + Map<ClusterNode, IntArray> qryMap) { + this.nodes = nodes; + this.partsMap = partsMap; + this.qryMap = qryMap; + } + + /** + * @return Collection of nodes a message shall be sent to. + */ + public Collection<ClusterNode> nodes() { + return nodes; + } + + /** + * @return Maps a node to partition array. + */ + public Map<ClusterNode, IntArray> partitionsMap() { + return partsMap; + } + + /** + * @return Maps a node to partition array. + */ + public Map<ClusterNode, IntArray> queryPartitionsMap() { + return qryMap; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java new file mode 100644 index 0000000..1112d19 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java @@ -0,0 +1,638 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.PartitionLossPolicy; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.util.GridIntIterator; +import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.h2.util.IntArray; +import org.jetbrains.annotations.Nullable; + +import javax.cache.CacheException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; +import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; + +/** + * Reduce partition mapper. + */ +public class ReducePartitionMapper { + /** */ + private static final Set<ClusterNode> UNMAPPED_PARTS = Collections.emptySet(); + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** */ + private final IgniteLogger log; + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public ReducePartitionMapper(GridKernalContext ctx, IgniteLogger log) { + this.ctx = ctx; + this.log = log; + } + + /** + * Evaluates nodes and nodes to partitions map given a list of cache ids, topology version and partitions. + * + * @param cacheIds Cache ids. + * @param topVer Topology version. + * @param parts Partitions array. + * @param isReplicatedOnly Allow only replicated caches. + * @param qryId Query ID. + * @return Result. + */ + public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer, + int[] parts, boolean isReplicatedOnly, long qryId) { + Collection<ClusterNode> nodes = null; + Map<ClusterNode, IntArray> partsMap = null; + Map<ClusterNode, IntArray> qryMap = null; + + for (int cacheId : cacheIds) { + GridCacheContext<?, ?> cctx = cacheContext(cacheId); + + PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy(); + + if (plc != READ_ONLY_SAFE && plc != READ_WRITE_SAFE) + continue; + + Collection<Integer> lostParts = cctx.topology().lostPartitions(); + + for (int part : lostParts) { + if (parts == null || Arrays.binarySearch(parts, part) >= 0) { + throw new CacheException("Failed to execute query because cache partition has been " + + "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); + } + } + } + + if (isPreloadingActive(cacheIds)) { + if (isReplicatedOnly) + nodes = replicatedUnstableDataNodes(cacheIds, qryId); + else { + partsMap = partitionedUnstableDataNodes(cacheIds, qryId); + + if (partsMap != null) { + qryMap = narrowForQuery(partsMap, parts); + + nodes = qryMap == null ? null : qryMap.keySet(); + } + } + } + else { + qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts, qryId); + + if (qryMap != null) + nodes = qryMap.keySet(); + } + + return new ReducePartitionMapResult(nodes, partsMap, qryMap); + } + + /** + * @param cacheId Cache ID. + * @return Cache context. + */ + private GridCacheContext<?,?> cacheContext(Integer cacheId) { + return ctx.cache().context().cacheContext(cacheId); + } + + /** + * @param cacheIds Cache IDs. + * @return {@code true} If preloading is active. + */ + private boolean isPreloadingActive(List<Integer> cacheIds) { + for (Integer cacheId : cacheIds) { + if (null == cacheContext(cacheId)) + throw new CacheException(String.format("Cache not found on local node [cacheId=%d]", cacheId)); + + if (hasMovingPartitions(cacheContext(cacheId))) + return true; + } + + return false; + } + + /** + * @param cctx Cache context. + * @return {@code True} If cache has partitions in {@link GridDhtPartitionState#MOVING} state. + */ + private static boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) { + assert cctx != null; + + return !cctx.isLocal() && cctx.topology().hasMovingPartitions(); + } + + /** + * @param isReplicatedOnly If we must only have replicated caches. + * @param topVer Topology version. + * @param cacheIds Participating cache IDs. + * @param parts Partitions. + * @param qryId Query ID. + * @return Data nodes or {@code null} if repartitioning started and we need to retry. + */ + private Map<ClusterNode, IntArray> stableDataNodes(boolean isReplicatedOnly, AffinityTopologyVersion topVer, + List<Integer> cacheIds, int[] parts, long qryId) { + GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0)); + + // If the first cache is not partitioned, find it (if it's present) and move it to index 0. + if (!cctx.isPartitioned()) { + for (int cacheId = 1; cacheId < cacheIds.size(); cacheId++) { + GridCacheContext<?, ?> currCctx = cacheContext(cacheIds.get(cacheId)); + + if (currCctx.isPartitioned()) { + Collections.swap(cacheIds, 0, cacheId); + + cctx = currCctx; + + break; + } + } + } + + Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts); + + Set<ClusterNode> nodes = map.keySet(); + + if (F.isEmpty(map)) + throw new CacheException("Failed to find data nodes for cache: " + cctx.name()); + + for (int i = 1; i < cacheIds.size(); i++) { + GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i)); + + String extraCacheName = extraCctx.name(); + + if (extraCctx.isLocal()) + continue; // No consistency guaranties for local caches. + + if (isReplicatedOnly && !extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with partitioned tables [replicatedCache=" + cctx.name() + + ", partitionedCache=" + extraCacheName + "]"); + + Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet(); + + if (F.isEmpty(extraNodes)) + throw new CacheException("Failed to find data nodes for cache: " + extraCacheName); + + boolean disjoint; + + if (extraCctx.isReplicated()) { + if (isReplicatedOnly) { + nodes.retainAll(extraNodes); + + disjoint = map.isEmpty(); + } + else + disjoint = !extraNodes.containsAll(nodes); + } + else + disjoint = !extraNodes.equals(nodes); + + if (disjoint) { + if (isPreloadingActive(cacheIds)) { + logRetry("Failed to calculate nodes for SQL query (got disjoint node map during rebalance) " + + "[qryId=" + qryId + ", affTopVer=" + topVer + ", cacheIds=" + cacheIds + + ", parts=" + (parts == null ? "[]" : Arrays.toString(parts)) + + ", replicatedOnly=" + isReplicatedOnly + ", lastCache=" + extraCctx.name() + + ", lastCacheId=" + extraCctx.cacheId() + ']'); + + return null; // Retry. + } + else + throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() + + ", cache2=" + extraCacheName + "]"); + } + } + + return map; + } + + /** + * @param topVer Topology version. + * @param cctx Cache context. + * @param parts Partitions. + */ + private Map<ClusterNode, IntArray> stableDataNodesMap(AffinityTopologyVersion topVer, + final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) { + + Map<ClusterNode, IntArray> mapping = new HashMap<>(); + + // Explicit partitions mapping is not applicable to replicated cache. + if (cctx.isReplicated()) { + for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).nodes()) + mapping.put(clusterNode, null); + + return mapping; + } + + List<List<ClusterNode>> assignment = cctx.affinity().assignment(topVer).assignment(); + + boolean needPartsFilter = parts != null; + + GridIntIterator iter = needPartsFilter ? new GridIntList(parts).iterator() : + U.forRange(0, cctx.affinity().partitions()); + + while(iter.hasNext()) { + int partId = iter.next(); + + List<ClusterNode> partNodes = assignment.get(partId); + + if (!partNodes.isEmpty()) { + ClusterNode prim = partNodes.get(0); + + if (!needPartsFilter) { + mapping.put(prim, null); + + continue; + } + + IntArray partIds = mapping.get(prim); + + if (partIds == null) { + partIds = new IntArray(); + + mapping.put(prim, partIds); + } + + partIds.add(partId); + } + } + + return mapping; + } + + /** + * Calculates partition mapping for partitioned cache on unstable topology. + * + * @param cacheIds Cache IDs. + * @param qryId Query ID. + * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. + */ + @SuppressWarnings("unchecked") + private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds, long qryId) { + // If the main cache is replicated, just replace it with the first partitioned. + GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds); + + final int partsCnt = cctx.affinity().partitions(); + + if (cacheIds.size() > 1) { // Check correct number of partitions for partitioned caches. + for (Integer cacheId : cacheIds) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); + + if (extraCctx.isReplicated() || extraCctx.isLocal()) + continue; + + int parts = extraCctx.affinity().partitions(); + + if (parts != partsCnt) + throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" + + cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + + ", parts2=" + parts + "]"); + } + } + + Set<ClusterNode>[] partLocs = new Set[partsCnt]; + + // Fill partition locations for main cache. + for (int p = 0; p < partsCnt; p++) { + List<ClusterNode> owners = cctx.topology().owners(p); + + if (F.isEmpty(owners)) { + // Handle special case: no mapping is configured for a partition. + if (F.isEmpty(cctx.affinity().assignment(NONE).get(p))) { + partLocs[p] = UNMAPPED_PARTS; // Mark unmapped partition. + + continue; + } + else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) { + logRetry("Failed to calculate nodes for SQL query (partition has no owners, but corresponding " + + "cache group has data nodes) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + ", part=" + p + + ", cacheGroupId=" + cctx.groupId() + ']'); + + return null; // Retry. + } + + throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]"); + } + + partLocs[p] = new HashSet<>(owners); + } + + if (cacheIds.size() > 1) { + // Find owner intersections for each participating partitioned cache partition. + // We need this for logical collocation between different partitioned caches with the same affinity. + for (Integer cacheId : cacheIds) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); + + // This is possible if we have replaced a replicated cache with a partitioned one earlier. + if (cctx == extraCctx) + continue; + + if (extraCctx.isReplicated() || extraCctx.isLocal()) + continue; + + for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { + List<ClusterNode> owners = extraCctx.topology().owners(p); + + if (partLocs[p] == UNMAPPED_PARTS) + continue; // Skip unmapped partitions. + + if (F.isEmpty(owners)) { + if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) { + logRetry("Failed to calculate nodes for SQL query (partition has no owners, but " + + "corresponding cache group has data nodes) [qryId=" + qryId + + ", cacheIds=" + cacheIds + ", cacheName=" + extraCctx.name() + + ", cacheId=" + extraCctx.cacheId() + ", part=" + p + + ", cacheGroupId=" + extraCctx.groupId() + ']'); + + return null; // Retry. + } + + throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + + ", part=" + p + "]"); + } + + if (partLocs[p] == null) + partLocs[p] = new HashSet<>(owners); + else { + partLocs[p].retainAll(owners); // Intersection of owners. + + if (partLocs[p].isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " + + "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", lastCacheName=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + + ", part=" + p + ']'); + + return null; // Intersection is empty -> retry. + } + } + } + } + + // Filter nodes where not all the replicated caches loaded. + for (Integer cacheId : cacheIds) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheId); + + if (!extraCctx.isReplicated()) + continue; + + Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx, qryId); + + if (F.isEmpty(dataNodes)) + return null; // Retry. + + int part = 0; + + for (Set<ClusterNode> partLoc : partLocs) { + if (partLoc == UNMAPPED_PARTS) + continue; // Skip unmapped partition. + + partLoc.retainAll(dataNodes); + + if (partLoc.isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " + + "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", lastReplicatedCacheName=" + extraCctx.name() + + ", lastReplicatedCacheId=" + extraCctx.cacheId() + ", part=" + part + ']'); + + return null; // Retry. + } + + part++; + } + } + } + + // Collect the final partitions mapping. + Map<ClusterNode, IntArray> res = new HashMap<>(); + + // Here partitions in all IntArray's will be sorted in ascending order, this is important. + for (int p = 0; p < partLocs.length; p++) { + Set<ClusterNode> pl = partLocs[p]; + + // Skip unmapped partitions. + if (pl == UNMAPPED_PARTS) + continue; + + assert !F.isEmpty(pl) : pl; + + ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl); + + IntArray parts = res.get(n); + + if (parts == null) + res.put(n, parts = new IntArray()); + + parts.add(p); + } + + return res; + } + + /** + * Calculates data nodes for replicated caches on unstable topology. + * + * @param cacheIds Cache IDs. + * @param qryId Query ID. + * @return Collection of all data nodes owning all the caches or {@code null} for retry. + */ + private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds, long qryId) { + int i = 0; + + GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++)); + + // The main cache is allowed to be partitioned. + if (!cctx.isReplicated()) { + assert cacheIds.size() > 1: "no extra replicated caches with partitioned main cache"; + + // Just replace the main cache with the first one extra. + cctx = cacheContext(cacheIds.get(i++)); + + assert cctx.isReplicated(): "all the extra caches must be replicated here"; + } + + Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx, qryId); + + if (F.isEmpty(nodes)) + return null; // Retry. + + for (;i < cacheIds.size(); i++) { + GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i)); + + if (extraCctx.isLocal()) + continue; + + if (!extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with tables in partitioned caches [replicatedCache=" + cctx.name() + ", " + + "partitionedCache=" + extraCctx.name() + "]"); + + Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx, qryId); + + if (F.isEmpty(extraOwners)) + return null; // Retry. + + nodes.retainAll(extraOwners); + + if (nodes.isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (got disjoint node map for REPLICATED caches " + + "during rebalance) [qryId=" + qryId + ", cacheIds=" + cacheIds + + ", lastCache=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + ']'); + + return null; // Retry. + } + } + + return nodes; + } + + /** + * Collects all the nodes owning all the partitions for the given replicated cache. + * + * @param cctx Cache context. + * @param qryId Query ID. + * @return Owning nodes or {@code null} if we can't find owners for some partitions. + */ + private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx, long qryId) { + assert cctx.isReplicated() : cctx.name() + " must be replicated"; + + String cacheName = cctx.name(); + + Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(cctx.groupId(), NONE)); + + if (dataNodes.isEmpty()) + throw new CacheException("Failed to find data nodes for cache: " + cacheName); + + // Find all the nodes owning all the partitions for replicated cache. + for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { + List<ClusterNode> owners = cctx.topology().owners(p); + + if (F.isEmpty(owners)) { + logRetry("Failed to calculate nodes for SQL query (partition of a REPLICATED cache has no owners) [" + + "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + + ", part=" + p + ']'); + + return null; // Retry. + } + + dataNodes.retainAll(owners); + + if (dataNodes.isEmpty()) { + logRetry("Failed to calculate nodes for SQL query (partitions of a REPLICATED has no common owners) [" + + "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + + ", lastPart=" + p + ']'); + + return null; // Retry. + } + } + + return dataNodes; + } + + /** + * @param grpId Cache group ID. + * @param topVer Topology version. + * @return Collection of data nodes. + */ + private Collection<ClusterNode> dataNodes(int grpId, AffinityTopologyVersion topVer) { + Collection<ClusterNode> res = ctx.discovery().cacheGroupAffinityNodes(grpId, topVer); + + return res != null ? res : Collections.<ClusterNode>emptySet(); + } + + /** + * + * @param partsMap Partitions map. + * @param parts Partitions. + * @return Result. + */ + private static Map<ClusterNode, IntArray> narrowForQuery(Map<ClusterNode, IntArray> partsMap, int[] parts) { + if (parts == null) + return partsMap; + + Map<ClusterNode, IntArray> cp = U.newHashMap(partsMap.size()); + + for (Map.Entry<ClusterNode, IntArray> entry : partsMap.entrySet()) { + IntArray filtered = new IntArray(parts.length); + + IntArray orig = entry.getValue(); + + for (int i = 0; i < orig.size(); i++) { + int p = orig.get(i); + + if (Arrays.binarySearch(parts, p) >= 0) + filtered.add(p); + } + + if (filtered.size() > 0) + cp.put(entry.getKey(), filtered); + } + + return cp.isEmpty() ? null : cp; + } + + /** + * @param cacheIds Cache IDs. + * @return The first partitioned cache context. + */ + private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) { + for (int i = 0; i < cacheIds.size(); i++) { + GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i)); + + if (i == 0 && cctx.isLocal()) + throw new CacheException("Cache is LOCAL: " + cctx.name()); + + if (!cctx.isReplicated() && !cctx.isLocal()) + return cctx; + } + + throw new IllegalStateException("Failed to find partitioned cache."); + } + + /** + * Load failed partition reservation. + * + * @param msg Message. + */ + private void logRetry(String msg) { + log.info(msg); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6613376d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java new file mode 100644 index 0000000..592efee --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionsSpecializer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.typedef.C2; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.h2.util.IntArray; + +import java.util.Map; + +/** + * Reducer partitions specializer. + */ +public class ReducePartitionsSpecializer implements C2<ClusterNode, Message, Message> { + /** Partitions map. */ + private final Map<ClusterNode, IntArray> partsMap; + + /** + * @param partsMap Partitions map. + */ + public ReducePartitionsSpecializer(Map<ClusterNode, IntArray> partsMap) { + this.partsMap = partsMap; + } + + /** {@inheritDoc} */ + @Override public Message apply(ClusterNode node, Message msg) { + if (msg instanceof GridH2QueryRequest) { + GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg); + + rq.queryPartitions(GridReduceQueryExecutor.toArray(partsMap.get(node))); + + return rq; + } else if (msg instanceof GridH2DmlRequest) { + GridH2DmlRequest rq = new GridH2DmlRequest((GridH2DmlRequest)msg); + + rq.queryPartitions(GridReduceQueryExecutor.toArray(partsMap.get(node))); + + return rq; + } + + return msg; + } +}