3414: wip.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d17e53d2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d17e53d2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d17e53d2 Branch: refs/heads/ignite-3414 Commit: d17e53d2e19b38de82938fe03039ebf355f41981 Parents: 91d6be9 Author: iveselovskiy <[email protected]> Authored: Wed Jul 6 21:09:58 2016 +0300 Committer: iveselovskiy <[email protected]> Committed: Wed Jul 6 21:09:58 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 74 ++++++++++++++------ .../planner/HadoopMapReducePlanGroup.java | 11 +++ .../HadoopWeightedMapReducePlannerTest.java | 36 ++++++++++ 3 files changed, 98 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d17e53d2/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 32cc144..cfe6113 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -115,24 +115,30 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc */ private Mappers assignMappers(Collection<HadoopInputSplit> splits, HadoopMapReducePlanTopology top) throws IgniteCheckedException { - Mappers res = new Mappers(); + Mappers bestSplitMapping = new Mappers(); for (HadoopInputSplit split : splits) { - // Try getting IGFS affinity. + // Try getting IGFS affinity. Note that an empty collection may be returned + // if this is an HDFS split that does not belong to any Ignite node host. Collection<UUID> nodeIds = affinityNodesForSplit(split, top); - // Get best node. + // Get best node for this split: UUID node = bestMapperNode(nodeIds, top); - res.add(split, node); + assert node != null; + + bestSplitMapping.add(split, node); } - return res; + return bestSplitMapping; } /** * Get affinity nodes for the given input split. * + * <p/>In general, order in the returned collection *is* significant, meaning that nodes containing more data + * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. + * * @param split Split. * @param top Topology. * @return Affintiy nodes. @@ -140,11 +146,14 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc */ private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top) throws IgniteCheckedException { + // IGFS part: Collection<UUID> nodeIds = igfsAffinityNodesForSplit(split); + // HDFS part: if (nodeIds == null) { nodeIds = new HashSet<>(); + // TODO: also sort hosts there in non-ascending data order, as we do in case of IGFS affinity splits. for (String host : split.hosts()) { HadoopMapReducePlanGroup grp = top.groupForHost(host); @@ -161,6 +170,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc /** * Get IGFS affinity nodes for split if possible. * + * <p/>Order in the returned collection *is* significant, meaning that nodes containing more data + * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling. + * * @param split Input split. * @return IGFS affinity or {@code null} if IGFS is not available. * @throws IgniteCheckedException If failed. @@ -207,6 +219,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } } + // Sort the nodes in non-ascending order by contained data lengths. + // NodeIdAndLength objects are used as keys to handle comparison when the lengths are equal. Map<NodeIdAndLength, UUID> res = new TreeMap<>(); for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet()) { @@ -215,7 +229,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc res.put(new NodeIdAndLength(id, idToLenEntry.getValue()), id); } - return new HashSet<>(res.values()); + return new ArrayList<>(res.values()); } } } @@ -233,8 +247,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @return Result. */ private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology top) { - // Priority node. - UUID priorityAffId = F.first(affIds); + // Priority node, may be null or empty. + final @Nullable UUID prioAffId = F.first(affIds); // Find group with the least weight. HadoopMapReducePlanGroup resGrp = null; @@ -242,14 +256,15 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc int resWeight = Integer.MAX_VALUE; for (HadoopMapReducePlanGroup grp : top.groups()) { - MapperPriority priority = groupPriority(grp, affIds, priorityAffId); + MapperPriority prio = groupPriority(grp, affIds, prioAffId); int weight = grp.weight() + - (priority == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight); + (prio == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight); - if (resGrp == null || weight < resWeight || weight == resWeight && priority.value() > resPrio.value()) { + // TODO: may be just consider HIGHEST prio to have load weight (locMapperWeight - 1)? + if (resGrp == null || weight < resWeight || weight == resWeight && prio.value() > resPrio.value()) { resGrp = grp; - resPrio = priority; + resPrio = prio; resWeight = weight; } } @@ -260,7 +275,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc resGrp.weight(resWeight); // Return the best node from the group. - return bestMapperNodeForGroup(resGrp, resPrio, affIds, priorityAffId); + return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId); } /** @@ -288,7 +303,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } case HIGH: { // Pick any affinity node. - assert affIds != null; + assert !F.isEmpty(affIds); List<Integer> cands = new ArrayList<>(); @@ -299,6 +314,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc cands.add(i); } + assert cands.size() >= 1; + idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size())); break; @@ -317,6 +334,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } } + // TODO: can we just return 'priorityAffId' there? + // TODO: is that really possible that prio == HIGHEST, but the id is not 'priorityAffId'? + //assert priorityAffId.equals(grp.nodeId(idx)); assert priority == MapperPriority.HIGHEST; } } @@ -351,7 +371,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc res.put(reducerEntry.getKey(), arr); } - assert reducerCnt == cnt; + assert reducerCnt == cnt : reducerCnt + " != " + cnt; return res; } @@ -550,22 +570,30 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * * @param grp Group. * @param affIds Affintiy IDs. - * @param priorityAffId Priority affinity ID. + * @param prioAffId Priority affinity ID. * @return Group priority. */ private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, @Nullable Collection<UUID> affIds, - @Nullable UUID priorityAffId) { - MapperPriority priority = MapperPriority.NORMAL; + @Nullable UUID prioAffId) { + // prioAffId is actually the 1st element of affIds: + assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == F.first(affIds); + assert grp != null; + + MapperPriority prio = MapperPriority.NORMAL; if (!F.isEmpty(affIds)) { + // Iterate over all the nodes on this machine: for (int i = 0; i < grp.nodeCount(); i++) { UUID id = grp.nodeId(i); if (affIds.contains(id)) { - priority = MapperPriority.HIGH; + // If there is an affinity node among this machine nodes, the prio is HIGH: + prio = MapperPriority.HIGH; - if (F.eq(priorityAffId, id)) { - priority = MapperPriority.HIGHEST; + // If preferred node (node containing most of the data) found on this machine, + // the prio gets HIGHEST: + if (F.eq(prioAffId, id)) { + prio = MapperPriority.HIGHEST; break; } @@ -573,7 +601,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } } - return priority; + return prio; } /** @@ -722,7 +750,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc } /** - * Mappers. + * Bi-Map from node id to a collection of splits. */ private static class Mappers { /** Node-to-splits map. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d17e53d2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java index fd01d53..8b9f5a3 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java @@ -26,6 +26,14 @@ import java.util.UUID; /** * Map-reduce plan group of nodes. + * In fact, this class represents a physical machine in the cluster that may have zero or more Ignite nodes on board. + * Note that the machine may not be a Hadoop data node. + * + * <p/>The {@code weight} field is an artificial integer value that represents a load (mostly CPU load) assigned to this + * machine. It assumed to be non-negative. + * + * <p/>{@code node} field is a first added node (set in the constructor). If more nodes added, this field is nulled, + * and only {@code nodes} collection is used further. */ public class HadoopMapReducePlanGroup { /** Node. */ @@ -47,6 +55,9 @@ public class HadoopMapReducePlanGroup { * @param macs MAC addresses. */ public HadoopMapReducePlanGroup(ClusterNode node, String macs) { + assert node != null; + assert macs != null; + this.node = node; this.macs = macs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d17e53d2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java index c623756..4fe075c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java @@ -72,6 +72,12 @@ public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { /** Host 3. */ private static final String HOST_3 = "host3"; + /** Host 4. */ + private static final String HOST_4 = "host4"; + + /** Host 5. */ + private static final String HOST_5 = "host5"; + /** Standard node 1. */ private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1); @@ -119,6 +125,36 @@ public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { } /** + * Test one HDFS split being assigned to affinity node. + * + * @throws Exception If failed. + */ + public void testOneHdfsSplitAffinity() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + Collection<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + // hosts that are out of Ignite topology at all: + splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, 3); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + assertEquals(5, plan.mappers()); + assertEquals(3, plan.mapperNodeIds().size()); + assert plan.mapperNodeIds().contains(ID_1); + + // TODO: enhance tests. + } + + /** * Create planner for IGFS. * * @param igfs IGFS.
