3414: wip.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d17e53d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d17e53d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d17e53d2

Branch: refs/heads/ignite-3414
Commit: d17e53d2e19b38de82938fe03039ebf355f41981
Parents: 91d6be9
Author: iveselovskiy <[email protected]>
Authored: Wed Jul 6 21:09:58 2016 +0300
Committer: iveselovskiy <[email protected]>
Committed: Wed Jul 6 21:09:58 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 74 ++++++++++++++------
 .../planner/HadoopMapReducePlanGroup.java       | 11 +++
 .../HadoopWeightedMapReducePlannerTest.java     | 36 ++++++++++
 3 files changed, 98 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d17e53d2/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 32cc144..cfe6113 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -115,24 +115,30 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      */
     private Mappers assignMappers(Collection<HadoopInputSplit> splits,
         HadoopMapReducePlanTopology top) throws IgniteCheckedException {
-        Mappers res = new Mappers();
+        Mappers bestSplitMapping = new Mappers();
 
         for (HadoopInputSplit split : splits) {
-            // Try getting IGFS affinity.
+            // Try getting IGFS affinity. Note that an empty collection may be 
returned
+            // if this is an HDFS split that does not belong to any Ignite 
node host.
             Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
 
-            // Get best node.
+            // Get best node for this split:
             UUID node = bestMapperNode(nodeIds, top);
 
-            res.add(split, node);
+            assert node != null;
+
+            bestSplitMapping.add(split, node);
         }
 
-        return res;
+        return bestSplitMapping;
     }
 
     /**
      * Get affinity nodes for the given input split.
      *
+     * <p/>In general, order in the returned collection *is* significant, 
meaning that nodes containing more data
+     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
+     *
      * @param split Split.
      * @param top Topology.
      * @return Affintiy nodes.
@@ -140,11 +146,14 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      */
     private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, 
HadoopMapReducePlanTopology top)
         throws IgniteCheckedException {
+        // IGFS part:
         Collection<UUID> nodeIds = igfsAffinityNodesForSplit(split);
 
+        // HDFS part:
         if (nodeIds == null) {
             nodeIds = new HashSet<>();
 
+            // TODO: also sort hosts there in non-ascending data order, as we 
do in case of IGFS affinity splits.
             for (String host : split.hosts()) {
                 HadoopMapReducePlanGroup grp = top.groupForHost(host);
 
@@ -161,6 +170,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
     /**
      * Get IGFS affinity nodes for split if possible.
      *
+     * <p/>Order in the returned collection *is* significant, meaning that 
nodes containing more data
+     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
+     *
      * @param split Input split.
      * @return IGFS affinity or {@code null} if IGFS is not available.
      * @throws IgniteCheckedException If failed.
@@ -207,6 +219,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                                 }
                             }
 
+                            // Sort the nodes in non-ascending order by 
contained data lengths.
+                            // NodeIdAndLength objects are used as keys to 
handle comparison when the lengths are equal.
                             Map<NodeIdAndLength, UUID> res = new TreeMap<>();
 
                             for (Map.Entry<UUID, Long> idToLenEntry : 
idToLen.entrySet()) {
@@ -215,7 +229,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                                 res.put(new NodeIdAndLength(id, 
idToLenEntry.getValue()), id);
                             }
 
-                            return new HashSet<>(res.values());
+                            return new ArrayList<>(res.values());
                         }
                     }
                 }
@@ -233,8 +247,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      * @return Result.
      */
     private UUID bestMapperNode(@Nullable Collection<UUID> affIds, 
HadoopMapReducePlanTopology top) {
-        // Priority node.
-        UUID priorityAffId = F.first(affIds);
+        // Priority node, may be null or empty.
+        final @Nullable UUID prioAffId = F.first(affIds);
 
         // Find group with the least weight.
         HadoopMapReducePlanGroup resGrp = null;
@@ -242,14 +256,15 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
         int resWeight = Integer.MAX_VALUE;
 
         for (HadoopMapReducePlanGroup grp : top.groups()) {
-            MapperPriority priority = groupPriority(grp, affIds, 
priorityAffId);
+            MapperPriority prio = groupPriority(grp, affIds, prioAffId);
 
             int weight = grp.weight() +
-                (priority == MapperPriority.NORMAL ? rmtMapperWeight : 
locMapperWeight);
+                (prio == MapperPriority.NORMAL ? rmtMapperWeight : 
locMapperWeight);
 
-            if (resGrp == null || weight < resWeight || weight == resWeight && 
priority.value() > resPrio.value()) {
+            // TODO: may be just consider HIGHEST prio to have load weight 
(locMapperWeight - 1)?
+            if (resGrp == null || weight < resWeight || weight == resWeight && 
prio.value() > resPrio.value()) {
                 resGrp = grp;
-                resPrio = priority;
+                resPrio = prio;
                 resWeight = weight;
             }
         }
@@ -260,7 +275,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
         resGrp.weight(resWeight);
 
         // Return the best node from the group.
-        return bestMapperNodeForGroup(resGrp, resPrio, affIds, priorityAffId);
+        return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId);
     }
 
     /**
@@ -288,7 +303,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                 }
                 case HIGH: {
                     // Pick any affinity node.
-                    assert affIds != null;
+                    assert !F.isEmpty(affIds);
 
                     List<Integer> cands = new ArrayList<>();
 
@@ -299,6 +314,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                             cands.add(i);
                     }
 
+                    assert cands.size() >= 1;
+
                     idx = 
cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
 
                     break;
@@ -317,6 +334,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
                         }
                     }
 
+                    // TODO: can we just return 'priorityAffId' there?
+                    // TODO: is that really possible that prio == HIGHEST, but 
the id is not 'priorityAffId'?
+                    //assert priorityAffId.equals(grp.nodeId(idx));
                     assert priority == MapperPriority.HIGHEST;
                 }
             }
@@ -351,7 +371,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
             res.put(reducerEntry.getKey(), arr);
         }
 
-        assert reducerCnt == cnt;
+        assert reducerCnt == cnt : reducerCnt + " != " + cnt;
 
         return res;
     }
@@ -550,22 +570,30 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
      *
      * @param grp Group.
      * @param affIds Affintiy IDs.
-     * @param priorityAffId Priority affinity ID.
+     * @param prioAffId Priority affinity ID.
      * @return Group priority.
      */
     private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, 
@Nullable Collection<UUID> affIds,
-        @Nullable UUID priorityAffId) {
-        MapperPriority priority = MapperPriority.NORMAL;
+        @Nullable UUID prioAffId) {
+        // prioAffId is actually the 1st element of affIds:
+        assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == 
F.first(affIds);
+        assert grp != null;
+
+        MapperPriority prio = MapperPriority.NORMAL;
 
         if (!F.isEmpty(affIds)) {
+            // Iterate over all the nodes on this machine:
             for (int i = 0; i < grp.nodeCount(); i++) {
                 UUID id = grp.nodeId(i);
 
                 if (affIds.contains(id)) {
-                    priority = MapperPriority.HIGH;
+                    // If there is an affinity node among this machine nodes, 
the prio is HIGH:
+                    prio = MapperPriority.HIGH;
 
-                    if (F.eq(priorityAffId, id)) {
-                        priority = MapperPriority.HIGHEST;
+                    // If preferred node (node containing most of the data) 
found on this machine,
+                    // the prio gets HIGHEST:
+                    if (F.eq(prioAffId, id)) {
+                        prio = MapperPriority.HIGHEST;
 
                         break;
                     }
@@ -573,7 +601,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
             }
         }
 
-        return priority;
+        return prio;
     }
 
     /**
@@ -722,7 +750,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReduc
     }
 
     /**
-     * Mappers.
+     * Bi-Map from node id to a collection of splits.
      */
     private static class Mappers {
         /** Node-to-splits map. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d17e53d2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
index fd01d53..8b9f5a3 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
@@ -26,6 +26,14 @@ import java.util.UUID;
 
 /**
  * Map-reduce plan group of nodes.
+ * In fact, this class represents a physical machine in the cluster that may 
have zero or more Ignite nodes on board.
+ * Note that the machine may not be a Hadoop data node.
+ *
+ * <p/>The {@code weight} field is an artificial integer value that represents 
a load (mostly CPU load) assigned to this
+ * machine. It assumed to be non-negative.
+ *
+ * <p/>{@code node} field is a first added node (set in the constructor). If 
more nodes added, this field is nulled,
+ * and only {@code nodes} collection is used further.
  */
 public class HadoopMapReducePlanGroup {
     /** Node. */
@@ -47,6 +55,9 @@ public class HadoopMapReducePlanGroup {
      * @param macs MAC addresses.
      */
     public HadoopMapReducePlanGroup(ClusterNode node, String macs) {
+        assert node != null;
+        assert macs != null;
+
         this.node = node;
         this.macs = macs;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d17e53d2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
index c623756..4fe075c 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java
@@ -72,6 +72,12 @@ public class HadoopWeightedMapReducePlannerTest extends 
GridCommonAbstractTest {
     /** Host 3. */
     private static final String HOST_3 = "host3";
 
+    /** Host 4. */
+    private static final String HOST_4 = "host4";
+
+    /** Host 5. */
+    private static final String HOST_5 = "host5";
+
     /** Standard node 1. */
     private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1);
 
@@ -119,6 +125,36 @@ public class HadoopWeightedMapReducePlannerTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test one HDFS split being assigned to affinity node.
+     *
+     * @throws Exception If failed.
+     */
+    public void testOneHdfsSplitAffinity() throws Exception {
+        IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, 
NODE_2).add(100, NODE_3).buildIgfs();
+
+        Collection<HadoopInputSplit> splits = new ArrayList<>();
+
+        splits.add(new HadoopFileBlock(new String[] { HOST_1 }, 
URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+        splits.add(new HadoopFileBlock(new String[] { HOST_2 }, 
URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+        splits.add(new HadoopFileBlock(new String[] { HOST_3 }, 
URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+        // hosts that are out of Ignite topology at all:
+        splits.add(new HadoopFileBlock(new String[] { HOST_4 }, 
URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+        splits.add(new HadoopFileBlock(new String[] { HOST_5 }, 
URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+
+        HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, 3);
+
+        IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+        HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+        assertEquals(5, plan.mappers());
+        assertEquals(3, plan.mapperNodeIds().size());
+        assert plan.mapperNodeIds().contains(ID_1);
+
+        // TODO: enhance tests.
+    }
+
+    /**
      * Create planner for IGFS.
      *
      * @param igfs IGFS.

Reply via email to