Repository: ignite
Updated Branches:
  refs/heads/ignite-3916 551528005 -> 4b66b969c


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
new file mode 100644
index 0000000..2d1ac0b
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -0,0 +1,846 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Map-reduce planner which assigns mappers and reducers based on their 
"weights". Weight describes how much resources
+ * are required to execute particular map or reduce task.
+ * <p>
+ * Plan creation consists of two steps: assigning mappers and assigning 
reducers.
+ * <p>
+ * Mappers are assigned based on input split data location. For each input 
split we search for nodes where
+ * its data is stored. Planner tries to assign mappers to their affinity nodes 
first. This process is governed by two
+ * properties:
+ * <ul>
+ *     <li><b>{@code localMapperWeight}</b> - weight of a map task when it is 
executed on an affinity node;</li>
+ *     <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is 
executed on a non-affinity node.</li>
+ * </ul>
+ * Planning algorithm assign mappers so that total resulting weight on all 
nodes is minimum possible.
+ * <p>
+ * Reducers are assigned differently. First we try to distribute reducers 
across nodes with mappers. This approach
+ * could minimize expensive data transfer over network. Reducer assigned to a 
node with mapper is considered
+ * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. 
This process continue until certain weight
+ * threshold is reached what means that current node is already too busy and 
it should not have higher priority over
+ * other nodes any more. Threshold can be configured using <b>{@code 
preferLocalReducerThresholdWeight}</b> property.
+ * <p>
+ * When local reducer threshold is reached on all nodes, we distribute 
remaining reducers based on their local and
+ * remote weights in the same way as it is done for mappers. This process is 
governed by two
+ * properties:
+ * <ul>
+ *     <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it 
is executed on a node with mappers;</li>
+ *     <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it 
is executed on a node without mappers.</li>
+ * </ul>
+ */
+public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReducePlanner {
+    /** Default local mapper weight. */
+    public static final int DFLT_LOC_MAPPER_WEIGHT = 100;
+
+    /** Default remote mapper weight. */
+    public static final int DFLT_RMT_MAPPER_WEIGHT = 100;
+
+    /** Default local reducer weight. */
+    public static final int DFLT_LOC_REDUCER_WEIGHT = 100;
+
+    /** Default remote reducer weight. */
+    public static final int DFLT_RMT_REDUCER_WEIGHT = 100;
+
+    /** Default reducer migration threshold weight. */
+    public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200;
+
+    /** Local mapper weight. */
+    private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT;
+
+    /** Remote mapper weight. */
+    private int rmtMapperWeight = DFLT_RMT_MAPPER_WEIGHT;
+
+    /** Local reducer weight. */
+    private int locReducerWeight = DFLT_LOC_REDUCER_WEIGHT;
+
+    /** Remote reducer weight. */
+    private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT;
+
+    /** Reducer migration threshold weight. */
+    private int preferLocReducerThresholdWeight = 
DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT;
+
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, 
Collection<ClusterNode> nodes,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        List<HadoopInputSplit> splits = 
HadoopCommonUtils.sortInputSplits(job.input());
+        int reducerCnt = job.info().reducers();
+
+        if (reducerCnt < 0)
+            throw new IgniteCheckedException("Number of reducers must be 
non-negative, actual: " + reducerCnt);
+
+        HadoopMapReducePlanTopology top = topology(nodes);
+
+        Mappers mappers = assignMappers(splits, top);
+
+        Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, 
reducerCnt);
+
+        return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers);
+    }
+
+    /**
+     * Assign mappers to nodes.
+     *
+     * @param splits Input splits.
+     * @param top Topology.
+     * @return Mappers.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Mappers assignMappers(Collection<HadoopInputSplit> splits,
+        HadoopMapReducePlanTopology top) throws IgniteCheckedException {
+        Mappers res = new Mappers();
+
+        for (HadoopInputSplit split : splits) {
+            // Try getting IGFS affinity.
+            Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
+
+            // Get best node.
+            UUID node = bestMapperNode(nodeIds, top);
+
+            assert node != null;
+
+            res.add(split, node);
+        }
+
+        return res;
+    }
+
+    /**
+     * Get affinity nodes for the given input split.
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
+     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
+     *
+     * @param split Split.
+     * @param top Topology.
+     * @return Affintiy nodes.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, 
HadoopMapReducePlanTopology top)
+        throws IgniteCheckedException {
+        Collection<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split);
+
+        if (igfsNodeIds != null)
+            return igfsNodeIds;
+
+        Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+        for (String host : split.hosts()) {
+            long len = split instanceof HadoopFileBlock ? 
((HadoopFileBlock)split).length() : 0L;
+
+            HadoopMapReducePlanGroup grp = top.groupForHost(host);
+
+            if (grp != null) {
+                for (int i = 0; i < grp.nodeCount(); i++) {
+                    UUID nodeId = grp.nodeId(i);
+
+                    res.put(new NodeIdAndLength(nodeId, len), nodeId);
+                }
+            }
+        }
+
+        return new LinkedHashSet<>(res.values());
+    }
+
+    /**
+     * Get IGFS affinity nodes for split if possible.
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
+     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
+     *
+     * @param split Input split.
+     * @return IGFS affinity or {@code null} if IGFS is not available.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private Collection<UUID> 
igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException 
{
+        if (split instanceof HadoopFileBlock) {
+            HadoopFileBlock split0 = (HadoopFileBlock)split;
+
+            if 
(IgniteFileSystem.IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
+                HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(split0.file().getAuthority());
+
+                IgfsEx igfs = null;
+
+                if (F.eq(ignite.name(), endpoint.grid()))
+                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
+
+                if (igfs != null && !igfs.isProxy(split0.file())) {
+                    IgfsPath path = new IgfsPath(split0.file());
+
+                    if (igfs.exists(path)) {
+                        Collection<IgfsBlockLocation> blocks;
+
+                        try {
+                            blocks = igfs.affinity(path, split0.start(), 
split0.length());
+                        }
+                        catch (IgniteException e) {
+                            throw new IgniteCheckedException("Failed to get 
IGFS file block affinity [path=" + path +
+                                ", start=" + split0.start() + ", len=" + 
split0.length() + ']', e);
+                        }
+
+                        assert blocks != null;
+
+                        if (blocks.size() == 1)
+                            return blocks.iterator().next().nodeIds();
+                        else {
+                            // The most "local" nodes go first.
+                            Map<UUID, Long> idToLen = new HashMap<>();
+
+                            for (IgfsBlockLocation block : blocks) {
+                                for (UUID id : block.nodeIds()) {
+                                    Long len = idToLen.get(id);
+
+                                    idToLen.put(id, len == null ? 
block.length() : block.length() + len);
+                                }
+                            }
+
+                            // Sort the nodes in non-ascending order by 
contained data lengths.
+                            Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+                            for (Map.Entry<UUID, Long> idToLenEntry : 
idToLen.entrySet()) {
+                                UUID id = idToLenEntry.getKey();
+
+                                res.put(new NodeIdAndLength(id, 
idToLenEntry.getValue()), id);
+                            }
+
+                            return new LinkedHashSet<>(res.values());
+                        }
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Find best mapper node.
+     *
+     * @param affIds Affinity node IDs.
+     * @param top Topology.
+     * @return Result.
+     */
+    private UUID bestMapperNode(@Nullable Collection<UUID> affIds, 
HadoopMapReducePlanTopology top) {
+        // Priority node.
+        UUID prioAffId = F.first(affIds);
+
+        // Find group with the least weight.
+        HadoopMapReducePlanGroup resGrp = null;
+        MapperPriority resPrio = MapperPriority.NORMAL;
+        int resWeight = Integer.MAX_VALUE;
+
+        for (HadoopMapReducePlanGroup grp : top.groups()) {
+            MapperPriority prio = groupPriority(grp, affIds, prioAffId);
+
+            int weight = grp.weight() + (prio == MapperPriority.NORMAL ? 
rmtMapperWeight : locMapperWeight);
+
+            if (resGrp == null || weight < resWeight || weight == resWeight && 
prio.value() > resPrio.value()) {
+                resGrp = grp;
+                resPrio = prio;
+                resWeight = weight;
+            }
+        }
+
+        assert resGrp != null;
+
+        // Update group weight for further runs.
+        resGrp.weight(resWeight);
+
+        // Return the best node from the group.
+        return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId);
+    }
+
+    /**
+     * Get best node in the group.
+     *
+     * @param grp Group.
+     * @param priority Priority.
+     * @param affIds Affinity IDs.
+     * @param prioAffId Priority affinity IDs.
+     * @return Best node ID in the group.
+     */
+    private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, 
MapperPriority priority,
+        @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) {
+        // Return the best node from the group.
+        int idx = 0;
+
+        // This is rare situation when several nodes are started on the same 
host.
+        if (!grp.single()) {
+            switch (priority) {
+                case NORMAL: {
+                    // Pick any node.
+                    idx = ThreadLocalRandom.current().nextInt(grp.nodeCount());
+
+                    break;
+                }
+                case HIGH: {
+                    // Pick any affinity node.
+                    assert affIds != null;
+
+                    List<Integer> cands = new ArrayList<>();
+
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
+
+                        if (affIds.contains(id))
+                            cands.add(i);
+                    }
+
+                    idx = 
cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
+
+                    break;
+                }
+                default: {
+                    // Find primary node.
+                    assert prioAffId != null;
+
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
+
+                        if (F.eq(id, prioAffId)) {
+                            idx = i;
+
+                            break;
+                        }
+                    }
+
+                    assert priority == MapperPriority.HIGHEST;
+                }
+            }
+        }
+
+        return grp.nodeId(idx);
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param splits Input splits.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     */
+    private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> 
splits, HadoopMapReducePlanTopology top,
+        Mappers mappers, int reducerCnt) {
+        Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, 
reducerCnt);
+
+        int cnt = 0;
+
+        Map<UUID, int[]> res = new HashMap<>(reducers.size());
+
+        for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) {
+            int[] arr = new int[reducerEntry.getValue()];
+
+            for (int i = 0; i < arr.length; i++)
+                arr[i] = cnt++;
+
+            res.put(reducerEntry.getKey(), arr);
+        }
+
+        assert reducerCnt == cnt : reducerCnt + " != " + cnt;
+
+        return res;
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param top Topology.
+     * @param splits Input splits.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     */
+    private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology 
top, Collection<HadoopInputSplit> splits,
+        Mappers mappers, int reducerCnt) {
+        Map<UUID, Integer> res = new HashMap<>();
+
+        // Assign reducers to splits.
+        Map<HadoopInputSplit, Integer> splitToReducerCnt = 
assignReducersToSplits(splits, reducerCnt);
+
+        // Assign as much local reducers as possible.
+        int remaining = 0;
+
+        for (Map.Entry<HadoopInputSplit, Integer> entry : 
splitToReducerCnt.entrySet()) {
+            HadoopInputSplit split = entry.getKey();
+            int cnt = entry.getValue();
+
+            if (cnt > 0) {
+                int assigned = assignLocalReducers(split, cnt, top, mappers, 
res);
+
+                assert assigned <= cnt;
+
+                remaining += cnt - assigned;
+            }
+        }
+
+        // Assign the rest reducers.
+        if (remaining > 0)
+            assignRemoteReducers(remaining, top, mappers, res);
+
+        return res;
+    }
+
+    /**
+     * Assign local split reducers.
+     *
+     * @param split Split.
+     * @param cnt Reducer count.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param resMap Reducers result map.
+     * @return Number of locally assigned reducers.
+     */
+    private int assignLocalReducers(HadoopInputSplit split, int cnt, 
HadoopMapReducePlanTopology top, Mappers mappers,
+        Map<UUID, Integer> resMap) {
+        // Dereference node.
+        UUID nodeId = mappers.splitToNode.get(split);
+
+        assert nodeId != null;
+
+        // Dereference group.
+        HadoopMapReducePlanGroup grp = top.groupForId(nodeId);
+
+        assert grp != null;
+
+        // Assign more reducers to the node until threshold is reached.
+        int res = 0;
+
+        while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) {
+            res++;
+
+            grp.weight(grp.weight() + locReducerWeight);
+        }
+
+        // Update result map.
+        if (res > 0) {
+            Integer reducerCnt = resMap.get(nodeId);
+
+            resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res);
+        }
+
+        return res;
+    }
+
+    /**
+     * Assign remote reducers. Assign to the least loaded first.
+     *
+     * @param cnt Count.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param resMap Reducers result map.
+     */
+    private void assignRemoteReducers(int cnt, HadoopMapReducePlanTopology 
top, Mappers mappers,
+        Map<UUID, Integer> resMap) {
+
+        TreeSet<HadoopMapReducePlanGroup> set = new TreeSet<>(new 
GroupWeightComparator());
+
+        set.addAll(top.groups());
+
+        while (cnt-- > 0) {
+            // The least loaded machine.
+            HadoopMapReducePlanGroup grp = set.first();
+
+            // Look for nodes with assigned splits.
+            List<UUID> splitNodeIds = null;
+
+            for (int i = 0; i < grp.nodeCount(); i++) {
+                UUID nodeId = grp.nodeId(i);
+
+                if (mappers.nodeToSplits.containsKey(nodeId)) {
+                    if (splitNodeIds == null)
+                        splitNodeIds = new ArrayList<>(2);
+
+                    splitNodeIds.add(nodeId);
+                }
+            }
+
+            // Select best node.
+            UUID id;
+            int newWeight;
+
+            if (splitNodeIds != null) {
+                id = 
splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size()));
+
+                newWeight = grp.weight() + locReducerWeight;
+            }
+            else {
+                id = 
grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount()));
+
+                newWeight = grp.weight() + rmtReducerWeight;
+            }
+
+            // Re-add entry with new weight.
+            boolean rmv = set.remove(grp);
+
+            assert rmv;
+
+            grp.weight(newWeight);
+
+            boolean add = set.add(grp);
+
+            assert add;
+
+            // Update result map.
+            Integer res = resMap.get(id);
+
+            resMap.put(id, res == null ? 1 : res + 1);
+        }
+    }
+
+    /**
+     * Comparator based on group's weight.
+     */
+    private static class GroupWeightComparator implements 
Comparator<HadoopMapReducePlanGroup> {
+        /** {@inheritDoc} */
+        @Override public int compare(HadoopMapReducePlanGroup first, 
HadoopMapReducePlanGroup second) {
+            int res = first.weight() - second.weight();
+
+            if (res < 0)
+                return -1;
+            else if (res > 0)
+                return 1;
+            else
+                return first.macs().compareTo(second.macs());
+        }
+    }
+
+    /**
+     * Distribute reducers between splits.
+     *
+     * @param splits Splits.
+     * @param reducerCnt Reducer count.
+     * @return Map from input split to reducer count.
+     */
+    private Map<HadoopInputSplit, Integer> 
assignReducersToSplits(Collection<HadoopInputSplit> splits,
+        int reducerCnt) {
+        Map<HadoopInputSplit, Integer> res = new 
IdentityHashMap<>(splits.size());
+
+        int base = reducerCnt / splits.size();
+        int remainder = reducerCnt % splits.size();
+
+        for (HadoopInputSplit split : splits) {
+            int val = base;
+
+            if (remainder > 0) {
+                val++;
+
+                remainder--;
+            }
+
+            res.put(split, val);
+        }
+
+        assert remainder == 0;
+
+        return res;
+    }
+
+    /**
+     * Calculate group priority.
+     *
+     * @param grp Group.
+     * @param affIds Affinity IDs.
+     * @param prioAffId Priority affinity ID.
+     * @return Group priority.
+     */
+    private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, 
@Nullable Collection<UUID> affIds,
+        @Nullable UUID prioAffId) {
+        assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == 
F.first(affIds);
+        assert grp != null;
+
+        MapperPriority prio = MapperPriority.NORMAL;
+
+        if (!F.isEmpty(affIds)) {
+            for (int i = 0; i < grp.nodeCount(); i++) {
+                UUID id = grp.nodeId(i);
+
+                if (affIds.contains(id)) {
+                    prio = MapperPriority.HIGH;
+
+                    if (F.eq(prioAffId, id)) {
+                        prio = MapperPriority.HIGHEST;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        return prio;
+    }
+
+    /**
+     * Get local mapper weight. This weight is added to a node when a mapper 
is assigned and it's input split data is
+     * located on this node (at least partially).
+     * <p>
+     * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}.
+     *
+     * @return Remote mapper weight.
+     */
+    public int getLocalMapperWeight() {
+        return locMapperWeight;
+    }
+
+    /**
+     * Set local mapper weight. See {@link #getLocalMapperWeight()} for more 
information.
+     *
+     * @param locMapperWeight Local mapper weight.
+     */
+    public void setLocalMapperWeight(int locMapperWeight) {
+        this.locMapperWeight = locMapperWeight;
+    }
+
+    /**
+     * Get remote mapper weight. This weight is added to a node when a mapper 
is assigned, but it's input
+     * split data is not located on this node.
+     * <p>
+     * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}.
+     *
+     * @return Remote mapper weight.
+     */
+    public int getRemoteMapperWeight() {
+        return rmtMapperWeight;
+    }
+
+    /**
+     * Set remote mapper weight. See {@link #getRemoteMapperWeight()} for more 
information.
+     *
+     * @param rmtMapperWeight Remote mapper weight.
+     */
+    public void setRemoteMapperWeight(int rmtMapperWeight) {
+        this.rmtMapperWeight = rmtMapperWeight;
+    }
+
+    /**
+     * Get local reducer weight. This weight is added to a node when a reducer 
is assigned and the node have at least
+     * one assigned mapper.
+     * <p>
+     * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}.
+     *
+     * @return Local reducer weight.
+     */
+    public int getLocalReducerWeight() {
+        return locReducerWeight;
+    }
+
+    /**
+     * Set local reducer weight. See {@link #getLocalReducerWeight()} for more 
information.
+     *
+     * @param locReducerWeight Local reducer weight.
+     */
+    public void setLocalReducerWeight(int locReducerWeight) {
+        this.locReducerWeight = locReducerWeight;
+    }
+
+    /**
+     * Get remote reducer weight. This weight is added to a node when a 
reducer is assigned, but the node doesn't have
+     * any assigned mappers.
+     * <p>
+     * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}.
+     *
+     * @return Remote reducer weight.
+     */
+    public int getRemoteReducerWeight() {
+        return rmtReducerWeight;
+    }
+
+    /**
+     * Set remote reducer weight. See {@link #getRemoteReducerWeight()} for 
more information.
+     *
+     * @param rmtReducerWeight Remote reducer weight.
+     */
+    public void setRemoteReducerWeight(int rmtReducerWeight) {
+        this.rmtReducerWeight = rmtReducerWeight;
+    }
+
+    /**
+     * Get reducer migration threshold weight. When threshold is reached, a 
node with mappers is no longer considered
+     * as preferred for further reducer assignments.
+     * <p>
+     * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}.
+     *
+     * @return Reducer migration threshold weight.
+     */
+    public int getPreferLocalReducerThresholdWeight() {
+        return preferLocReducerThresholdWeight;
+    }
+
+    /**
+     * Set reducer migration threshold weight. See {@link 
#getPreferLocalReducerThresholdWeight()} for more
+     * information.
+     *
+     * @param reducerMigrationThresholdWeight Reducer migration threshold 
weight.
+     */
+    public void setPreferLocalReducerThresholdWeight(int 
reducerMigrationThresholdWeight) {
+        this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteHadoopWeightedMapReducePlanner.class, this);
+    }
+
+    /**
+     * Node ID and length.
+     */
+    private static class NodeIdAndLength implements 
Comparable<NodeIdAndLength> {
+        /** Node ID. */
+        private final UUID id;
+
+        /** Length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Node ID.
+         * @param len Length.
+         */
+        public NodeIdAndLength(UUID id, long len) {
+            this.id = id;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(NodeIdAndLength obj) {
+            long res = len - obj.len;
+
+            if (res > 0)
+                return -1;
+            else if (res < 0)
+                return 1;
+            else
+                return id.compareTo(obj.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof NodeIdAndLength && F.eq(id, 
((NodeIdAndLength)obj).id);
+        }
+    }
+
+    /**
+     * Mappers.
+     */
+    private static class Mappers {
+        /** Node-to-splits map. */
+        private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits = 
new HashMap<>();
+
+        /** Split-to-node map. */
+        private final Map<HadoopInputSplit, UUID> splitToNode = new 
IdentityHashMap<>();
+
+        /**
+         * Add mapping.
+         *
+         * @param split Split.
+         * @param node Node.
+         */
+        public void add(HadoopInputSplit split, UUID node) {
+            Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node);
+
+            if (nodeSplits == null) {
+                nodeSplits = new HashSet<>();
+
+                nodeToSplits.put(node, nodeSplits);
+            }
+
+            nodeSplits.add(split);
+
+            splitToNode.put(split, node);
+        }
+    }
+
+    /**
+     * Mapper priority enumeration.
+     */
+    private enum MapperPriority {
+        /** Normal node. */
+        NORMAL(0),
+
+        /** (likely) Affinity node. */
+        HIGH(1),
+
+        /** (likely) Affinity node with the highest priority (e.g. because it 
hosts more data than other nodes). */
+        HIGHEST(2);
+
+        /** Value. */
+        private final int val;
+
+        /**
+         * Constructor.
+         *
+         * @param val Value.
+         */
+        MapperPriority(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
new file mode 100644
index 0000000..83f94ce
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+/**
+ * Common Hadoop utility methods which do not depend on Hadoop API.
+ */
+public class HadoopCommonUtils {
+    /**
+     * Sort input splits by length.
+     *
+     * @param splits Splits.
+     * @return Sorted splits.
+     */
+    public static List<HadoopInputSplit> 
sortInputSplits(Collection<HadoopInputSplit> splits) {
+        int id = 0;
+
+        TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+
+        for (HadoopInputSplit split : splits) {
+            long len = split instanceof HadoopFileBlock ? 
((HadoopFileBlock)split).length() : 0;
+
+            sortedSplits.add(new SplitSortWrapper(id++, split, len));
+        }
+
+        ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+
+        for (SplitSortWrapper sortedSplit : sortedSplits)
+            res.add(sortedSplit.split);
+
+        return res;
+    }
+
+    /**
+     * Split wrapper for sorting.
+     */
+    private static class SplitSortWrapper implements 
Comparable<SplitSortWrapper> {
+        /** Unique ID. */
+        private final int id;
+
+        /** Split. */
+        private final HadoopInputSplit split;
+
+        /** Split length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Unique ID.
+         * @param split Split.
+         * @param len Split length.
+         */
+        public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+            this.id = id;
+            this.split = split;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(SplitSortWrapper other) {
+            long res = len - other.len;
+
+            if (res > 0)
+                return -1;
+            else if (res < 0)
+                return 1;
+            else
+                return id - other.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof SplitSortWrapper && id == 
((SplitSortWrapper)obj).id;
+        }
+    }
+
+    /**
+     * Private constructor.
+     */
+    private HadoopCommonUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
new file mode 100644
index 0000000..a44e1ae
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IGFS endpoint abstraction.
+ */
+public class HadoopIgfsEndpoint {
+    /** Localhost. */
+    public static final String LOCALHOST = "127.0.0.1";
+
+    /** IGFS name. */
+    private final String igfsName;
+
+    /** Grid name. */
+    private final String gridName;
+
+    /** Host. */
+    private final String host;
+
+    /** Port. */
+    private final int port;
+
+    /**
+     * Normalize IGFS URI.
+     *
+     * @param uri URI.
+     * @return Normalized URI.
+     * @throws IOException If failed.
+     */
+    public static URI normalize(URI uri) throws IOException {
+        try {
+            if (!F.eq(IgniteFileSystem.IGFS_SCHEME, uri.getScheme()))
+                throw new IOException("Failed to normalize UIR because it has 
non IGFS scheme: " + uri);
+
+            HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(uri.getAuthority());
+
+            StringBuilder sb = new StringBuilder();
+
+            if (endpoint.igfs() != null)
+                sb.append(endpoint.igfs());
+
+            if (endpoint.grid() != null)
+                sb.append(":").append(endpoint.grid());
+
+            return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : 
null, endpoint.host(), endpoint.port(),
+                uri.getPath(), uri.getQuery(), uri.getFragment());
+        }
+        catch (URISyntaxException | IgniteCheckedException e) {
+            throw new IOException("Failed to normalize URI: " + uri, e);
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param connStr Connection string.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    public HadoopIgfsEndpoint(@Nullable String connStr) throws 
IgniteCheckedException {
+        if (connStr == null)
+            connStr = "";
+
+        String[] tokens = connStr.split("@", -1);
+
+        IgniteBiTuple<String, Integer> hostPort;
+
+        if (tokens.length == 1) {
+            igfsName = null;
+            gridName = null;
+
+            hostPort = hostPort(connStr, connStr);
+        }
+        else if (tokens.length == 2) {
+            String authStr = tokens[0];
+
+            if (authStr.isEmpty()) {
+                gridName = null;
+                igfsName = null;
+            }
+            else {
+                String[] authTokens = authStr.split(":", -1);
+
+                igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
+
+                if (authTokens.length == 1)
+                    gridName = null;
+                else if (authTokens.length == 2)
+                    gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
+                else
+                    throw new IgniteCheckedException("Invalid connection 
string format: " + connStr);
+            }
+
+            hostPort = hostPort(connStr, tokens[1]);
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string 
format: " + connStr);
+
+        host = hostPort.get1();
+
+        assert hostPort.get2() != null;
+
+        port = hostPort.get2();
+    }
+
+    /**
+     * Parse host and port.
+     *
+     * @param connStr Full connection string.
+     * @param hostPortStr Host/port connection string part.
+     * @return Tuple with host and port.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    private IgniteBiTuple<String, Integer> hostPort(String connStr, String 
hostPortStr) throws IgniteCheckedException {
+        String[] tokens = hostPortStr.split(":", -1);
+
+        String host = tokens[0];
+
+        if (F.isEmpty(host))
+            host = LOCALHOST;
+
+        int port;
+
+        if (tokens.length == 1)
+            port = IgfsIpcEndpointConfiguration.DFLT_PORT;
+        else if (tokens.length == 2) {
+            String portStr = tokens[1];
+
+            try {
+                port = Integer.valueOf(portStr);
+
+                if (port < 0 || port > 65535)
+                    throw new IgniteCheckedException("Invalid port number: " + 
connStr);
+            }
+            catch (NumberFormatException e) {
+                throw new IgniteCheckedException("Invalid port number: " + 
connStr);
+            }
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string 
format: " + connStr);
+
+        return F.t(host, port);
+    }
+
+    /**
+     * @return IGFS name.
+     */
+    @Nullable public String igfs() {
+        return igfsName;
+    }
+
+    /**
+     * @return Grid name.
+     */
+    @Nullable public String grid() {
+        return gridName;
+    }
+
+    /**
+     * @return Host.
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * @return Host.
+     */
+    public boolean isLocal() {
+        return F.eq(LOCALHOST, host);
+    }
+
+    /**
+     * @return Port.
+     */
+    public int port() {
+        return port;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopIgfsEndpoint.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
new file mode 100644
index 0000000..f01f72b
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
+
+/**
+ * Base class for map-reduce planners.
+ */
+public abstract class HadoopAbstractMapReducePlanner implements 
HadoopMapReducePlanner {
+    /** Injected grid. */
+    @IgniteInstanceResource
+    protected Ignite ignite;
+
+    /** Logger. */
+    @SuppressWarnings("UnusedDeclaration")
+    @LoggerResource
+    protected IgniteLogger log;
+
+    /**
+     * Create plan topology.
+     *
+     * @param nodes Topology nodes.
+     * @return Plan topology.
+     */
+    protected static HadoopMapReducePlanTopology 
topology(Collection<ClusterNode> nodes) {
+        Map<String, HadoopMapReducePlanGroup> macsMap = new 
HashMap<>(nodes.size());
+
+        Map<UUID, HadoopMapReducePlanGroup> idToGrp = new 
HashMap<>(nodes.size());
+        Map<String, HadoopMapReducePlanGroup> hostToGrp = new 
HashMap<>(nodes.size());
+
+        for (ClusterNode node : nodes) {
+            String macs = node.attribute(ATTR_MACS);
+
+            HadoopMapReducePlanGroup grp = macsMap.get(macs);
+
+            if (grp == null) {
+                grp = new HadoopMapReducePlanGroup(node, macs);
+
+                macsMap.put(macs, grp);
+            }
+            else
+                grp.add(node);
+
+            idToGrp.put(node.id(), grp);
+
+            for (String host : node.addresses()) {
+                HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
+
+                if (hostGrp == null)
+                    hostToGrp.put(host, grp);
+                else
+                    assert hostGrp == grp;
+            }
+        }
+
+        return new HadoopMapReducePlanTopology(new 
ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
+    }
+
+
+    /**
+     * Groups nodes by host names.
+     *
+     * @param top Topology to group.
+     * @return Map.
+     */
+    protected static Map<String, Collection<UUID>> 
groupByHost(Collection<ClusterNode> top) {
+        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
+
+        for (ClusterNode node : top) {
+            for (String host : node.hostNames()) {
+                Collection<UUID> nodeIds = grouped.get(host);
+
+                if (nodeIds == null) {
+                    // Expecting 1-2 nodes per host.
+                    nodeIds = new ArrayList<>(2);
+
+                    grouped.put(host, nodeIds);
+                }
+
+                nodeIds.add(node.id());
+            }
+        }
+
+        return grouped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
new file mode 100644
index 0000000..7aaf3fa
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Map-reduce plan.
+ */
+public class HadoopDefaultMapReducePlan implements HadoopMapReducePlan {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Mappers map. */
+    private Map<UUID, Collection<HadoopInputSplit>> mappers;
+
+    /** Reducers map. */
+    private Map<UUID, int[]> reducers;
+
+    /** Mappers count. */
+    private int mappersCnt;
+
+    /** Reducers count. */
+    private int reducersCnt;
+
+    /**
+     * @param mappers Mappers map.
+     * @param reducers Reducers map.
+     */
+    public HadoopDefaultMapReducePlan(Map<UUID, Collection<HadoopInputSplit>> 
mappers,
+        Map<UUID, int[]> reducers) {
+        this.mappers = mappers;
+        this.reducers = reducers;
+
+        if (mappers != null) {
+            for (Collection<HadoopInputSplit> splits : mappers.values())
+                mappersCnt += splits.size();
+        }
+
+        if (reducers != null) {
+            for (int[] rdcrs : reducers.values())
+                reducersCnt += rdcrs.length;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int mappers() {
+        return mappersCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int reducers() {
+        return reducersCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID nodeForReducer(int reducer) {
+        assert reducer >= 0 && reducer < reducersCnt : reducer;
+
+        for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
+            for (int r : entry.getValue()) {
+                if (r == reducer)
+                    return entry.getKey();
+            }
+        }
+
+        throw new IllegalStateException("Not found reducer index: " + reducer);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Collection<HadoopInputSplit> mappers(UUID 
nodeId) {
+        return mappers.get(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public int[] reducers(UUID nodeId) {
+        return reducers.get(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> mapperNodeIds() {
+        return mappers.keySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> reducerNodeIds() {
+        return reducers.keySet();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
new file mode 100644
index 0000000..2fe8682
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.ArrayList;
+import java.util.UUID;
+
+/**
+ * Map-reduce plan group of nodes on a single physical machine.
+ */
+public class HadoopMapReducePlanGroup {
+    /** Node. */
+    private ClusterNode node;
+
+    /** Nodes. */
+    private ArrayList<ClusterNode> nodes;
+
+    /** MAC addresses. */
+    private final String macs;
+
+    /** Weight. */
+    private int weight;
+
+    /**
+     * Constructor.
+     *
+     * @param node First node in the group.
+     * @param macs MAC addresses.
+     */
+    public HadoopMapReducePlanGroup(ClusterNode node, String macs) {
+        assert node != null;
+        assert macs != null;
+
+        this.node = node;
+        this.macs = macs;
+    }
+
+    /**
+     * Add node to the group.
+     *
+     * @param newNode New node.
+     */
+    public void add(ClusterNode newNode) {
+        if (node != null) {
+            nodes = new ArrayList<>(2);
+
+            nodes.add(node);
+
+            node = null;
+        }
+
+        nodes.add(newNode);
+    }
+
+    /**
+     * @return MAC addresses.
+     */
+    public String macs() {
+        return macs;
+    }
+
+    /**
+     * @return {@code True} if only sinle node present.
+     */
+    public boolean single() {
+        return nodeCount() == 1;
+    }
+
+    /**
+     * Get node ID by index.
+     *
+     * @param idx Index.
+     * @return Node.
+     */
+    public UUID nodeId(int idx) {
+        ClusterNode res;
+
+        if (node != null) {
+            assert idx == 0;
+
+            res = node;
+        }
+        else {
+            assert nodes != null;
+            assert idx < nodes.size();
+
+            res = nodes.get(idx);
+        }
+
+        assert res != null;
+
+        return res.id();
+    }
+
+    /**
+     * @return Node count.
+     */
+    public int nodeCount() {
+        return node != null ? 1 : nodes.size();
+    }
+
+    /**
+     * @return weight.
+     */
+    public int weight() {
+        return weight;
+    }
+
+    /**
+     * @param weight weight.
+     */
+    public void weight(int weight) {
+        this.weight = weight;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return macs.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, 
((HadoopMapReducePlanGroup)obj).macs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopMapReducePlanGroup.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b66b969/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
new file mode 100644
index 0000000..fa5c469
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Map-reduce plan topology.
+ */
+public class HadoopMapReducePlanTopology {
+    /** All groups. */
+    private final List<HadoopMapReducePlanGroup> grps;
+
+    /** Node ID to group map. */
+    private final Map<UUID, HadoopMapReducePlanGroup> idToGrp;
+
+    /** Host to group map. */
+    private final Map<String, HadoopMapReducePlanGroup> hostToGrp;
+
+    /**
+     * Constructor.
+     *
+     * @param grps All groups.
+     * @param idToGrp ID to group map.
+     * @param hostToGrp Host to group map.
+     */
+    public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps,
+        Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, 
HadoopMapReducePlanGroup> hostToGrp) {
+        assert grps != null;
+        assert idToGrp != null;
+        assert hostToGrp != null;
+
+        this.grps = grps;
+        this.idToGrp = idToGrp;
+        this.hostToGrp = hostToGrp;
+    }
+
+    /**
+     * @return All groups.
+     */
+    public List<HadoopMapReducePlanGroup> groups() {
+        return grps;
+    }
+
+    /**
+     * Get group for node ID.
+     *
+     * @param id Node ID.
+     * @return Group.
+     */
+    public HadoopMapReducePlanGroup groupForId(UUID id) {
+        return idToGrp.get(id);
+    }
+
+    /**
+     * Get group for host.
+     *
+     * @param host Host.
+     * @return Group.
+     */
+    @Nullable public HadoopMapReducePlanGroup groupForHost(String host) {
+        return hostToGrp.get(host);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopMapReducePlanTopology.class, this);
+    }
+}

Reply via email to