http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
new file mode 100644
index 0000000..27ffc19
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -0,0 +1,846 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Map-reduce planner which assigns mappers and reducers based on their 
"weights". Weight describes how much resources
+ * are required to execute particular map or reduce task.
+ * <p>
+ * Plan creation consists of two steps: assigning mappers and assigning 
reducers.
+ * <p>
+ * Mappers are assigned based on input split data location. For each input 
split we search for nodes where
+ * its data is stored. Planner tries to assign mappers to their affinity nodes 
first. This process is governed by two
+ * properties:
+ * <ul>
+ *     <li><b>{@code localMapperWeight}</b> - weight of a map task when it is 
executed on an affinity node;</li>
+ *     <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is 
executed on a non-affinity node.</li>
+ * </ul>
+ * Planning algorithm assign mappers so that total resulting weight on all 
nodes is minimum possible.
+ * <p>
+ * Reducers are assigned differently. First we try to distribute reducers 
across nodes with mappers. This approach
+ * could minimize expensive data transfer over network. Reducer assigned to a 
node with mapper is considered
+ * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. 
This process continue until certain weight
+ * threshold is reached what means that current node is already too busy and 
it should not have higher priority over
+ * other nodes any more. Threshold can be configured using <b>{@code 
preferLocalReducerThresholdWeight}</b> property.
+ * <p>
+ * When local reducer threshold is reached on all nodes, we distribute 
remaining reducers based on their local and
+ * remote weights in the same way as it is done for mappers. This process is 
governed by two
+ * properties:
+ * <ul>
+ *     <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it 
is executed on a node with mappers;</li>
+ *     <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it 
is executed on a node without mappers.</li>
+ * </ul>
+ */
+public class IgniteHadoopWeightedMapReducePlanner extends 
HadoopAbstractMapReducePlanner {
+    /** Default local mapper weight. */
+    public static final int DFLT_LOC_MAPPER_WEIGHT = 100;
+
+    /** Default remote mapper weight. */
+    public static final int DFLT_RMT_MAPPER_WEIGHT = 100;
+
+    /** Default local reducer weight. */
+    public static final int DFLT_LOC_REDUCER_WEIGHT = 100;
+
+    /** Default remote reducer weight. */
+    public static final int DFLT_RMT_REDUCER_WEIGHT = 100;
+
+    /** Default reducer migration threshold weight. */
+    public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200;
+
+    /** Local mapper weight. */
+    private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT;
+
+    /** Remote mapper weight. */
+    private int rmtMapperWeight = DFLT_RMT_MAPPER_WEIGHT;
+
+    /** Local reducer weight. */
+    private int locReducerWeight = DFLT_LOC_REDUCER_WEIGHT;
+
+    /** Remote reducer weight. */
+    private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT;
+
+    /** Reducer migration threshold weight. */
+    private int preferLocReducerThresholdWeight = 
DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT;
+
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, 
Collection<ClusterNode> nodes,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        List<HadoopInputSplit> splits = 
HadoopUtils.sortInputSplits(job.input());
+        int reducerCnt = job.info().reducers();
+
+        if (reducerCnt < 0)
+            throw new IgniteCheckedException("Number of reducers must be 
non-negative, actual: " + reducerCnt);
+
+        HadoopMapReducePlanTopology top = topology(nodes);
+
+        Mappers mappers = assignMappers(splits, top);
+
+        Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, 
reducerCnt);
+
+        return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers);
+    }
+
+    /**
+     * Assign mappers to nodes.
+     *
+     * @param splits Input splits.
+     * @param top Topology.
+     * @return Mappers.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Mappers assignMappers(Collection<HadoopInputSplit> splits,
+        HadoopMapReducePlanTopology top) throws IgniteCheckedException {
+        Mappers res = new Mappers();
+
+        for (HadoopInputSplit split : splits) {
+            // Try getting IGFS affinity.
+            Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
+
+            // Get best node.
+            UUID node = bestMapperNode(nodeIds, top);
+
+            assert node != null;
+
+            res.add(split, node);
+        }
+
+        return res;
+    }
+
+    /**
+     * Get affinity nodes for the given input split.
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
+     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
+     *
+     * @param split Split.
+     * @param top Topology.
+     * @return Affintiy nodes.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, 
HadoopMapReducePlanTopology top)
+        throws IgniteCheckedException {
+        Collection<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split);
+
+        if (igfsNodeIds != null)
+            return igfsNodeIds;
+
+        Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+        for (String host : split.hosts()) {
+            long len = split instanceof HadoopFileBlock ? 
((HadoopFileBlock)split).length() : 0L;
+
+            HadoopMapReducePlanGroup grp = top.groupForHost(host);
+
+            if (grp != null) {
+                for (int i = 0; i < grp.nodeCount(); i++) {
+                    UUID nodeId = grp.nodeId(i);
+
+                    res.put(new NodeIdAndLength(nodeId, len), nodeId);
+                }
+            }
+        }
+
+        return new LinkedHashSet<>(res.values());
+    }
+
+    /**
+     * Get IGFS affinity nodes for split if possible.
+     * <p>
+     * Order in the returned collection *is* significant, meaning that nodes 
containing more data
+     * go first. This way, the 1st nodes in the collection considered to be 
preferable for scheduling.
+     *
+     * @param split Input split.
+     * @return IGFS affinity or {@code null} if IGFS is not available.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private Collection<UUID> 
igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException 
{
+        if (split instanceof HadoopFileBlock) {
+            HadoopFileBlock split0 = (HadoopFileBlock)split;
+
+            if 
(IgniteFileSystem.IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
+                HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(split0.file().getAuthority());
+
+                IgfsEx igfs = null;
+
+                if (F.eq(ignite.name(), endpoint.grid()))
+                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
+
+                if (igfs != null && !igfs.isProxy(split0.file())) {
+                    IgfsPath path = new IgfsPath(split0.file());
+
+                    if (igfs.exists(path)) {
+                        Collection<IgfsBlockLocation> blocks;
+
+                        try {
+                            blocks = igfs.affinity(path, split0.start(), 
split0.length());
+                        }
+                        catch (IgniteException e) {
+                            throw new IgniteCheckedException("Failed to get 
IGFS file block affinity [path=" + path +
+                                ", start=" + split0.start() + ", len=" + 
split0.length() + ']', e);
+                        }
+
+                        assert blocks != null;
+
+                        if (blocks.size() == 1)
+                            return blocks.iterator().next().nodeIds();
+                        else {
+                            // The most "local" nodes go first.
+                            Map<UUID, Long> idToLen = new HashMap<>();
+
+                            for (IgfsBlockLocation block : blocks) {
+                                for (UUID id : block.nodeIds()) {
+                                    Long len = idToLen.get(id);
+
+                                    idToLen.put(id, len == null ? 
block.length() : block.length() + len);
+                                }
+                            }
+
+                            // Sort the nodes in non-ascending order by 
contained data lengths.
+                            Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+                            for (Map.Entry<UUID, Long> idToLenEntry : 
idToLen.entrySet()) {
+                                UUID id = idToLenEntry.getKey();
+
+                                res.put(new NodeIdAndLength(id, 
idToLenEntry.getValue()), id);
+                            }
+
+                            return new LinkedHashSet<>(res.values());
+                        }
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Find best mapper node.
+     *
+     * @param affIds Affinity node IDs.
+     * @param top Topology.
+     * @return Result.
+     */
+    private UUID bestMapperNode(@Nullable Collection<UUID> affIds, 
HadoopMapReducePlanTopology top) {
+        // Priority node.
+        UUID prioAffId = F.first(affIds);
+
+        // Find group with the least weight.
+        HadoopMapReducePlanGroup resGrp = null;
+        MapperPriority resPrio = MapperPriority.NORMAL;
+        int resWeight = Integer.MAX_VALUE;
+
+        for (HadoopMapReducePlanGroup grp : top.groups()) {
+            MapperPriority prio = groupPriority(grp, affIds, prioAffId);
+
+            int weight = grp.weight() + (prio == MapperPriority.NORMAL ? 
rmtMapperWeight : locMapperWeight);
+
+            if (resGrp == null || weight < resWeight || weight == resWeight && 
prio.value() > resPrio.value()) {
+                resGrp = grp;
+                resPrio = prio;
+                resWeight = weight;
+            }
+        }
+
+        assert resGrp != null;
+
+        // Update group weight for further runs.
+        resGrp.weight(resWeight);
+
+        // Return the best node from the group.
+        return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId);
+    }
+
+    /**
+     * Get best node in the group.
+     *
+     * @param grp Group.
+     * @param priority Priority.
+     * @param affIds Affinity IDs.
+     * @param prioAffId Priority affinity IDs.
+     * @return Best node ID in the group.
+     */
+    private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, 
MapperPriority priority,
+        @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) {
+        // Return the best node from the group.
+        int idx = 0;
+
+        // This is rare situation when several nodes are started on the same 
host.
+        if (!grp.single()) {
+            switch (priority) {
+                case NORMAL: {
+                    // Pick any node.
+                    idx = ThreadLocalRandom.current().nextInt(grp.nodeCount());
+
+                    break;
+                }
+                case HIGH: {
+                    // Pick any affinity node.
+                    assert affIds != null;
+
+                    List<Integer> cands = new ArrayList<>();
+
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
+
+                        if (affIds.contains(id))
+                            cands.add(i);
+                    }
+
+                    idx = 
cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
+
+                    break;
+                }
+                default: {
+                    // Find primary node.
+                    assert prioAffId != null;
+
+                    for (int i = 0; i < grp.nodeCount(); i++) {
+                        UUID id = grp.nodeId(i);
+
+                        if (F.eq(id, prioAffId)) {
+                            idx = i;
+
+                            break;
+                        }
+                    }
+
+                    assert priority == MapperPriority.HIGHEST;
+                }
+            }
+        }
+
+        return grp.nodeId(idx);
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param splits Input splits.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     */
+    private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> 
splits, HadoopMapReducePlanTopology top,
+        Mappers mappers, int reducerCnt) {
+        Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, 
reducerCnt);
+
+        int cnt = 0;
+
+        Map<UUID, int[]> res = new HashMap<>(reducers.size());
+
+        for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) {
+            int[] arr = new int[reducerEntry.getValue()];
+
+            for (int i = 0; i < arr.length; i++)
+                arr[i] = cnt++;
+
+            res.put(reducerEntry.getKey(), arr);
+        }
+
+        assert reducerCnt == cnt : reducerCnt + " != " + cnt;
+
+        return res;
+    }
+
+    /**
+     * Generate reducers.
+     *
+     * @param top Topology.
+     * @param splits Input splits.
+     * @param mappers Mappers.
+     * @param reducerCnt Reducer count.
+     * @return Reducers.
+     */
+    private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology 
top, Collection<HadoopInputSplit> splits,
+        Mappers mappers, int reducerCnt) {
+        Map<UUID, Integer> res = new HashMap<>();
+
+        // Assign reducers to splits.
+        Map<HadoopInputSplit, Integer> splitToReducerCnt = 
assignReducersToSplits(splits, reducerCnt);
+
+        // Assign as much local reducers as possible.
+        int remaining = 0;
+
+        for (Map.Entry<HadoopInputSplit, Integer> entry : 
splitToReducerCnt.entrySet()) {
+            HadoopInputSplit split = entry.getKey();
+            int cnt = entry.getValue();
+
+            if (cnt > 0) {
+                int assigned = assignLocalReducers(split, cnt, top, mappers, 
res);
+
+                assert assigned <= cnt;
+
+                remaining += cnt - assigned;
+            }
+        }
+
+        // Assign the rest reducers.
+        if (remaining > 0)
+            assignRemoteReducers(remaining, top, mappers, res);
+
+        return res;
+    }
+
+    /**
+     * Assign local split reducers.
+     *
+     * @param split Split.
+     * @param cnt Reducer count.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param resMap Reducers result map.
+     * @return Number of locally assigned reducers.
+     */
+    private int assignLocalReducers(HadoopInputSplit split, int cnt, 
HadoopMapReducePlanTopology top, Mappers mappers,
+        Map<UUID, Integer> resMap) {
+        // Dereference node.
+        UUID nodeId = mappers.splitToNode.get(split);
+
+        assert nodeId != null;
+
+        // Dereference group.
+        HadoopMapReducePlanGroup grp = top.groupForId(nodeId);
+
+        assert grp != null;
+
+        // Assign more reducers to the node until threshold is reached.
+        int res = 0;
+
+        while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) {
+            res++;
+
+            grp.weight(grp.weight() + locReducerWeight);
+        }
+
+        // Update result map.
+        if (res > 0) {
+            Integer reducerCnt = resMap.get(nodeId);
+
+            resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res);
+        }
+
+        return res;
+    }
+
+    /**
+     * Assign remote reducers. Assign to the least loaded first.
+     *
+     * @param cnt Count.
+     * @param top Topology.
+     * @param mappers Mappers.
+     * @param resMap Reducers result map.
+     */
+    private void assignRemoteReducers(int cnt, HadoopMapReducePlanTopology 
top, Mappers mappers,
+        Map<UUID, Integer> resMap) {
+
+        TreeSet<HadoopMapReducePlanGroup> set = new TreeSet<>(new 
GroupWeightComparator());
+
+        set.addAll(top.groups());
+
+        while (cnt-- > 0) {
+            // The least loaded machine.
+            HadoopMapReducePlanGroup grp = set.first();
+
+            // Look for nodes with assigned splits.
+            List<UUID> splitNodeIds = null;
+
+            for (int i = 0; i < grp.nodeCount(); i++) {
+                UUID nodeId = grp.nodeId(i);
+
+                if (mappers.nodeToSplits.containsKey(nodeId)) {
+                    if (splitNodeIds == null)
+                        splitNodeIds = new ArrayList<>(2);
+
+                    splitNodeIds.add(nodeId);
+                }
+            }
+
+            // Select best node.
+            UUID id;
+            int newWeight;
+
+            if (splitNodeIds != null) {
+                id = 
splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size()));
+
+                newWeight = grp.weight() + locReducerWeight;
+            }
+            else {
+                id = 
grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount()));
+
+                newWeight = grp.weight() + rmtReducerWeight;
+            }
+
+            // Re-add entry with new weight.
+            boolean rmv = set.remove(grp);
+
+            assert rmv;
+
+            grp.weight(newWeight);
+
+            boolean add = set.add(grp);
+
+            assert add;
+
+            // Update result map.
+            Integer res = resMap.get(id);
+
+            resMap.put(id, res == null ? 1 : res + 1);
+        }
+    }
+
+    /**
+     * Comparator based on group's weight.
+     */
+    private static class GroupWeightComparator implements 
Comparator<HadoopMapReducePlanGroup> {
+        /** {@inheritDoc} */
+        @Override public int compare(HadoopMapReducePlanGroup first, 
HadoopMapReducePlanGroup second) {
+            int res = first.weight() - second.weight();
+
+            if (res < 0)
+                return -1;
+            else if (res > 0)
+                return 1;
+            else
+                return first.macs().compareTo(second.macs());
+        }
+    }
+
+    /**
+     * Distribute reducers between splits.
+     *
+     * @param splits Splits.
+     * @param reducerCnt Reducer count.
+     * @return Map from input split to reducer count.
+     */
+    private Map<HadoopInputSplit, Integer> 
assignReducersToSplits(Collection<HadoopInputSplit> splits,
+        int reducerCnt) {
+        Map<HadoopInputSplit, Integer> res = new 
IdentityHashMap<>(splits.size());
+
+        int base = reducerCnt / splits.size();
+        int remainder = reducerCnt % splits.size();
+
+        for (HadoopInputSplit split : splits) {
+            int val = base;
+
+            if (remainder > 0) {
+                val++;
+
+                remainder--;
+            }
+
+            res.put(split, val);
+        }
+
+        assert remainder == 0;
+
+        return res;
+    }
+
+    /**
+     * Calculate group priority.
+     *
+     * @param grp Group.
+     * @param affIds Affinity IDs.
+     * @param prioAffId Priority affinity ID.
+     * @return Group priority.
+     */
+    private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, 
@Nullable Collection<UUID> affIds,
+        @Nullable UUID prioAffId) {
+        assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == 
F.first(affIds);
+        assert grp != null;
+
+        MapperPriority prio = MapperPriority.NORMAL;
+
+        if (!F.isEmpty(affIds)) {
+            for (int i = 0; i < grp.nodeCount(); i++) {
+                UUID id = grp.nodeId(i);
+
+                if (affIds.contains(id)) {
+                    prio = MapperPriority.HIGH;
+
+                    if (F.eq(prioAffId, id)) {
+                        prio = MapperPriority.HIGHEST;
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        return prio;
+    }
+
+    /**
+     * Get local mapper weight. This weight is added to a node when a mapper 
is assigned and it's input split data is
+     * located on this node (at least partially).
+     * <p>
+     * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}.
+     *
+     * @return Remote mapper weight.
+     */
+    public int getLocalMapperWeight() {
+        return locMapperWeight;
+    }
+
+    /**
+     * Set local mapper weight. See {@link #getLocalMapperWeight()} for more 
information.
+     *
+     * @param locMapperWeight Local mapper weight.
+     */
+    public void setLocalMapperWeight(int locMapperWeight) {
+        this.locMapperWeight = locMapperWeight;
+    }
+
+    /**
+     * Get remote mapper weight. This weight is added to a node when a mapper 
is assigned, but it's input
+     * split data is not located on this node.
+     * <p>
+     * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}.
+     *
+     * @return Remote mapper weight.
+     */
+    public int getRemoteMapperWeight() {
+        return rmtMapperWeight;
+    }
+
+    /**
+     * Set remote mapper weight. See {@link #getRemoteMapperWeight()} for more 
information.
+     *
+     * @param rmtMapperWeight Remote mapper weight.
+     */
+    public void setRemoteMapperWeight(int rmtMapperWeight) {
+        this.rmtMapperWeight = rmtMapperWeight;
+    }
+
+    /**
+     * Get local reducer weight. This weight is added to a node when a reducer 
is assigned and the node have at least
+     * one assigned mapper.
+     * <p>
+     * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}.
+     *
+     * @return Local reducer weight.
+     */
+    public int getLocalReducerWeight() {
+        return locReducerWeight;
+    }
+
+    /**
+     * Set local reducer weight. See {@link #getLocalReducerWeight()} for more 
information.
+     *
+     * @param locReducerWeight Local reducer weight.
+     */
+    public void setLocalReducerWeight(int locReducerWeight) {
+        this.locReducerWeight = locReducerWeight;
+    }
+
+    /**
+     * Get remote reducer weight. This weight is added to a node when a 
reducer is assigned, but the node doesn't have
+     * any assigned mappers.
+     * <p>
+     * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}.
+     *
+     * @return Remote reducer weight.
+     */
+    public int getRemoteReducerWeight() {
+        return rmtReducerWeight;
+    }
+
+    /**
+     * Set remote reducer weight. See {@link #getRemoteReducerWeight()} for 
more information.
+     *
+     * @param rmtReducerWeight Remote reducer weight.
+     */
+    public void setRemoteReducerWeight(int rmtReducerWeight) {
+        this.rmtReducerWeight = rmtReducerWeight;
+    }
+
+    /**
+     * Get reducer migration threshold weight. When threshold is reached, a 
node with mappers is no longer considered
+     * as preferred for further reducer assignments.
+     * <p>
+     * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}.
+     *
+     * @return Reducer migration threshold weight.
+     */
+    public int getPreferLocalReducerThresholdWeight() {
+        return preferLocReducerThresholdWeight;
+    }
+
+    /**
+     * Set reducer migration threshold weight. See {@link 
#getPreferLocalReducerThresholdWeight()} for more
+     * information.
+     *
+     * @param reducerMigrationThresholdWeight Reducer migration threshold 
weight.
+     */
+    public void setPreferLocalReducerThresholdWeight(int 
reducerMigrationThresholdWeight) {
+        this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteHadoopWeightedMapReducePlanner.class, this);
+    }
+
+    /**
+     * Node ID and length.
+     */
+    private static class NodeIdAndLength implements 
Comparable<NodeIdAndLength> {
+        /** Node ID. */
+        private final UUID id;
+
+        /** Length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Node ID.
+         * @param len Length.
+         */
+        public NodeIdAndLength(UUID id, long len) {
+            this.id = id;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(NodeIdAndLength obj) {
+            long res = len - obj.len;
+
+            if (res > 0)
+                return -1;
+            else if (res < 0)
+                return 1;
+            else
+                return id.compareTo(obj.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof NodeIdAndLength && F.eq(id, 
((NodeIdAndLength)obj).id);
+        }
+    }
+
+    /**
+     * Mappers.
+     */
+    private static class Mappers {
+        /** Node-to-splits map. */
+        private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits = 
new HashMap<>();
+
+        /** Split-to-node map. */
+        private final Map<HadoopInputSplit, UUID> splitToNode = new 
IdentityHashMap<>();
+
+        /**
+         * Add mapping.
+         *
+         * @param split Split.
+         * @param node Node.
+         */
+        public void add(HadoopInputSplit split, UUID node) {
+            Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node);
+
+            if (nodeSplits == null) {
+                nodeSplits = new HashSet<>();
+
+                nodeToSplits.put(node, nodeSplits);
+            }
+
+            nodeSplits.add(split);
+
+            splitToNode.put(split, node);
+        }
+    }
+
+    /**
+     * Mapper priority enumeration.
+     */
+    private enum MapperPriority {
+        /** Normal node. */
+        NORMAL(0),
+
+        /** (likely) Affinity node. */
+        HIGH(1),
+
+        /** (likely) Affinity node with the highest priority (e.g. because it 
hosts more data than other nodes). */
+        HIGHEST(2);
+
+        /** Value. */
+        private final int val;
+
+        /**
+         * Constructor.
+         *
+         * @param val Value.
+         */
+        MapperPriority(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
new file mode 100644
index 0000000..7635b9e
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Ignite Hadoop Accelerator map-reduce classes.
+ */
+package org.apache.ignite.hadoop.mapreduce;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/package-info.java 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/package-info.java
new file mode 100644
index 0000000..8a5ae34
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Ignite Hadoop Accelerator API.
+ */
+package org.apache.ignite.hadoop;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
new file mode 100644
index 0000000..c34808a
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.util;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Map;
+
+/**
+ * Name mapper which maps one user name to another based on predefined 
dictionary. If name is not found in the
+ * dictionary, or dictionary is not defined, either passed user name or some 
default value could be returned.
+ */
+public class BasicUserNameMapper implements UserNameMapper {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Mappings. */
+    private Map<String, String> mappings;
+
+    /** Whether to use default user name. */
+    private boolean useDfltUsrName;;
+
+    /** Default user name. */
+    private String dfltUsrName;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String map(String name) {
+        String res = mappings != null ? mappings.get(name) : null;
+
+        return res != null ? res : useDfltUsrName ? dfltUsrName : name;
+    }
+
+    /**
+     * Get mappings.
+     *
+     * @return Mappings.
+     */
+    @Nullable public Map<String, String> getMappings() {
+        return mappings;
+    }
+
+    /**
+     * Set mappings.
+     *
+     * @param mappings Mappings.
+     */
+    public void setMappings(@Nullable Map<String, String> mappings) {
+        this.mappings = mappings;
+    }
+
+    /**
+     * Get whether to use default user name when there is no mapping for 
current user name.
+     *
+     * @return Whether to use default user name.
+     */
+    public boolean isUseDefaultUserName() {
+        return useDfltUsrName;
+    }
+
+    /**
+     * Set whether to use default user name when there is no mapping for 
current user name.
+     *
+     * @param useDfltUsrName Whether to use default user name.
+     */
+    public void setUseDefaultUserName(boolean useDfltUsrName) {
+        this.useDfltUsrName = useDfltUsrName;
+    }
+
+    /**
+     * Get default user name (optional).
+     * <p>
+     * This user name will be used if provided mappings doesn't contain 
mapping for the given user name and
+     * {#isUseDefaultUserName} is set to {@code true}.
+     * <p>
+     * Defaults to {@code null}.
+     *
+     * @return Default user name.
+     */
+    @Nullable public String getDefaultUserName() {
+        return dfltUsrName;
+    }
+
+    /**
+     * Set default user name (optional). See {@link #getDefaultUserName()} for 
more information.
+     *
+     * @param dfltUsrName Default user name.
+     */
+    public void setDefaultUserName(@Nullable String dfltUsrName) {
+        this.dfltUsrName = dfltUsrName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(BasicUserNameMapper.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/ChainedUserNameMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/ChainedUserNameMapper.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/ChainedUserNameMapper.java
new file mode 100644
index 0000000..7635e25
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/ChainedUserNameMapper.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.util;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * Chained user name mapper. Delegate name conversion to child mappers.
+ */
+public class ChainedUserNameMapper implements UserNameMapper, LifecycleAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Child mappers. */
+    private UserNameMapper[] mappers;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String map(String name) {
+        for (UserNameMapper mapper : mappers)
+            name = mapper.map(name);
+
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (mappers == null)
+            throw new IgniteException("Mappers cannot be null.");
+
+        for (int i = 0; i < mappers.length; i++) {
+            if (mappers[i] == null)
+                throw new IgniteException("Mapper cannot be null [index=" + i 
+ ']');
+        }
+
+        for (UserNameMapper mapper : mappers) {
+            if (mapper instanceof LifecycleAware)
+                ((LifecycleAware)mapper).start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        assert mappers != null;
+
+        for (UserNameMapper mapper : mappers) {
+            if (mapper instanceof LifecycleAware)
+                ((LifecycleAware)mapper).stop();
+        }
+    }
+
+    /**
+     * Get child mappers.
+     *
+     * @return Child mappers.
+     */
+    public UserNameMapper[] getMappers() {
+        return mappers;
+    }
+
+    /**
+     * Set child mappers.
+     *
+     * @param mappers Child mappers.
+     */
+    public void setMappers(UserNameMapper... mappers) {
+        this.mappers = mappers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ChainedUserNameMapper.class, this,
+            "mappers", mappers != null ? Arrays.toString(mappers) : null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/KerberosUserNameMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/KerberosUserNameMapper.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/KerberosUserNameMapper.java
new file mode 100644
index 0000000..433fb82
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/KerberosUserNameMapper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.util;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Kerberos user name mapper. Use it when you need to map simple user name to 
Kerberos principal.
+ * E.g. from {@code johndoe} to {@code john...@your.realm.com} or {@code 
johndoe/ad...@your.realm.com}.
+ */
+public class KerberosUserNameMapper implements UserNameMapper, LifecycleAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Instance. */
+    private String instance;
+
+    /** Realm. */
+    private String realm;
+
+    /** State. */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String map(String name) {
+        assert state != null;
+
+        name = IgfsUtils.fixUserName(name);
+
+        switch (state) {
+            case NAME:
+                return name;
+
+            case NAME_REALM:
+                return name + '@' + realm;
+
+            case NAME_INSTANCE:
+                return name + '/' + instance;
+
+            default:
+                assert state == State.NAME_INSTANCE_REALM;
+
+                return name + '/' + instance + '@' + realm;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (!F.isEmpty(instance))
+            state = F.isEmpty(realm) ? State.NAME_INSTANCE : 
State.NAME_INSTANCE_REALM;
+        else
+            state = F.isEmpty(realm) ? State.NAME : State.NAME_REALM;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        // No-op.
+    }
+
+    /**
+     * Get Kerberos instance (optional).
+     *
+     * @return Instance.
+     */
+    @Nullable public String getInstance() {
+        return instance;
+    }
+
+    /**
+     * Set Kerberos instance (optional).
+     *
+     * @param instance Kerberos instance.
+     */
+    public void setInstance(@Nullable String instance) {
+        this.instance = instance;
+    }
+
+    /**
+     * Get Kerberos realm (optional).
+     *
+     * @return Kerberos realm.
+     */
+    @Nullable public String getRealm() {
+        return realm;
+    }
+
+    /**
+     * Set Kerberos realm (optional).
+     *
+     * @param realm Kerberos realm.
+     */
+    public void setRealm(@Nullable String realm) {
+        this.realm = realm;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(KerberosUserNameMapper.class, this);
+    }
+
+    /**
+     * State enumeration.
+     */
+    private enum State {
+        /** Name only. */
+        NAME,
+
+        /** Name and realm. */
+        NAME_REALM,
+
+        /** Name and host. */
+        NAME_INSTANCE,
+
+        /** Name, host and realm. */
+        NAME_INSTANCE_REALM,
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
new file mode 100644
index 0000000..26dc4b2
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.util;
+
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * Hadoop file system name mapper. Used by {@link HadoopFileSystemFactory} 
implementation to pass proper user names
+ * to the underlying Hadoop file system.
+ */
+public interface UserNameMapper extends Serializable {
+    /**
+     * Map user name.
+     *
+     * @param name User name.
+     * @return Mapped user name.
+     */
+    @Nullable public String map(String name);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/package-info.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/package-info.java
new file mode 100644
index 0000000..d84c0ba
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Ignite Hadoop Accelerator utility classes.
+ */
+package org.apache.ignite.hadoop.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
new file mode 100644
index 0000000..23eaa18
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+/**
+ * Hadoop attributes.
+ */
+public class HadoopAttributes implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Attribute name. */
+    public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + 
".hadoop";
+
+    /** Map-reduce planner class name. */
+    private String plannerCls;
+
+    /** External executor flag. */
+    private boolean extExec;
+
+    /** Maximum parallel tasks. */
+    private int maxParallelTasks;
+
+    /** Maximum task queue size. */
+    private int maxTaskQueueSize;
+
+    /** Library names. */
+    @GridToStringExclude
+    private String[] libNames;
+
+    /** Number of cores. */
+    private int cores;
+
+    /**
+     * Get attributes for node (if any).
+     *
+     * @param node Node.
+     * @return Attributes or {@code null} if Hadoop Accelerator is not enabled 
for node.
+     */
+    @Nullable public static HadoopAttributes forNode(ClusterNode node) {
+        return node.attribute(NAME);
+    }
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public HadoopAttributes() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Configuration.
+     */
+    public HadoopAttributes(HadoopConfiguration cfg) {
+        assert cfg != null;
+        assert cfg.getMapReducePlanner() != null;
+
+        plannerCls = cfg.getMapReducePlanner().getClass().getName();
+
+        // TODO: IGNITE-404: Get from configuration when fixed.
+        extExec = false;
+
+        maxParallelTasks = cfg.getMaxParallelTasks();
+        maxTaskQueueSize = cfg.getMaxTaskQueueSize();
+        libNames = cfg.getNativeLibraryNames();
+
+        // Cores count already passed in other attributes, we add it here for 
convenience.
+        cores = Runtime.getRuntime().availableProcessors();
+    }
+
+    /**
+     * @return Map reduce planner class name.
+     */
+    public String plannerClassName() {
+        return plannerCls;
+    }
+
+    /**
+     * @return External execution flag.
+     */
+    public boolean externalExecution() {
+        return extExec;
+    }
+
+    /**
+     * @return Maximum parallel tasks.
+     */
+    public int maxParallelTasks() {
+        return maxParallelTasks;
+    }
+
+    /**
+     * @return Maximum task queue size.
+     */
+    public int maxTaskQueueSize() {
+        return maxTaskQueueSize;
+    }
+
+
+    /**
+     * @return Native library names.
+     */
+    public String[] nativeLibraryNames() {
+        return libNames;
+    }
+
+    /**
+     * @return Number of cores on machine.
+     */
+    public int cores() {
+        return cores;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(plannerCls);
+        out.writeBoolean(extExec);
+        out.writeInt(maxParallelTasks);
+        out.writeInt(maxTaskQueueSize);
+        out.writeObject(libNames);
+        out.writeInt(cores);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        plannerCls = (String)in.readObject();
+        extExec = in.readBoolean();
+        maxParallelTasks = in.readInt();
+        maxTaskQueueSize = in.readInt();
+        libNames = (String[])in.readObject();
+        cores = in.readInt();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopAttributes.class, this, "libNames", 
Arrays.toString(libNames));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
new file mode 100644
index 0000000..30a6e72
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.ClassCache;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Class loader allowing explicitly load classes without delegation to parent 
class loader.
+ * Also supports class parsing for finding dependencies which contain 
transitive dependencies
+ * unavailable for parent.
+ */
+public class HadoopClassLoader extends URLClassLoader implements ClassCache {
+    static {
+        // We are very parallel capable.
+        registerAsParallelCapable();
+    }
+
+    /** Hadoop class name: Daemon. */
+    public static final String CLS_DAEMON = "org.apache.hadoop.util.Daemon";
+
+    /** Hadoop class name: ShutdownHookManager. */
+    public static final String CLS_SHUTDOWN_HOOK_MANAGER = 
"org.apache.hadoop.util.ShutdownHookManager";
+
+    /** Hadoop class name: NativeCodeLoader. */
+    public static final String CLS_NATIVE_CODE_LOADER = 
"org.apache.hadoop.util.NativeCodeLoader";
+
+    /** Hadoop class name: Daemon replacement. */
+    public static final String CLS_DAEMON_REPLACE = 
"org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon";
+
+    /** Hadoop class name: ShutdownHookManager replacement. */
+    public static final String CLS_SHUTDOWN_HOOK_MANAGER_REPLACE =
+        
"org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager";
+
+    /** Name of libhadoop library. */
+    private static final String LIBHADOOP = "hadoop.";
+
+    /** */
+    private static final URLClassLoader APP_CLS_LDR = 
(URLClassLoader)HadoopClassLoader.class.getClassLoader();
+
+    /** */
+    private static final Collection<URL> appJars = 
F.asList(APP_CLS_LDR.getURLs());
+
+    /** */
+    private static volatile Collection<URL> hadoopJars;
+
+    /** */
+    private static final Map<String, byte[]> bytesCache = new 
ConcurrentHashMap8<>();
+
+    /** Class cache. */
+    private final ConcurrentMap<String, Class> cacheMap = new 
ConcurrentHashMap<>();
+
+    /** Diagnostic name of this class loader. */
+    @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
+    private final String name;
+
+    /** Native library names. */
+    private final String[] libNames;
+
+    /**
+     * Gets name for Job class loader. The name is specific for local node id.
+     * @param locNodeId The local node id.
+     * @return The class loader name.
+     */
+    public static String nameForJob(UUID locNodeId) {
+        return "hadoop-job-node-" + locNodeId.toString();
+    }
+
+    /**
+     * Gets name for the task class loader. Task class loader
+     * @param info The task info.
+     * @param prefix Get only prefix (without task type and number)
+     * @return The class loader name.
+     */
+    public static String nameForTask(HadoopTaskInfo info, boolean prefix) {
+        if (prefix)
+            return "hadoop-task-" + info.jobId() + "-";
+        else
+            return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + 
info.taskNumber();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param urls Urls.
+     * @param name Classloader name.
+     * @param libNames Optional additional native library names to be linked 
from parent classloader.
+     */
+    public HadoopClassLoader(URL[] urls, String name, @Nullable String[] 
libNames) {
+        super(addHadoopUrls(urls), APP_CLS_LDR);
+
+        assert !(getParent() instanceof HadoopClassLoader);
+
+        this.name = name;
+        this.libNames = libNames;
+
+        initializeNativeLibraries();
+    }
+
+    /**
+     * Workaround to load native Hadoop libraries. Java doesn't allow native 
libraries to be loaded from different
+     * classloaders. But we load Hadoop classes many times and one of these 
classes - {@code NativeCodeLoader} - tries
+     * to load the same native library over and over again.
+     * <p>
+     * To fix the problem, we force native library load in parent class loader 
and then "link" handle to this native
+     * library to our class loader. As a result, our class loader will think 
that the library is already loaded and will
+     * be able to link native methods.
+     *
+     * @see <a 
href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version";>
+     *     JNI specification</a>
+     */
+    private void initializeNativeLibraries() {
+        try {
+            // This must trigger native library load.
+            // TODO: Do not delegate to APP LDR
+            Class.forName(CLS_NATIVE_CODE_LOADER, true, APP_CLS_LDR);
+
+            final Vector<Object> curVector = U.field(this, "nativeLibraries");
+
+            // TODO: Do not delegate to APP LDR
+            ClassLoader ldr = APP_CLS_LDR;
+
+            while (ldr != null) {
+                Vector vector = U.field(ldr, "nativeLibraries");
+
+                for (Object lib : vector) {
+                    String name = U.field(lib, "name");
+
+                    boolean add = name.contains(LIBHADOOP);
+
+                    if (!add && libNames != null) {
+                        for (String libName : libNames) {
+                            if (libName != null && name.contains(libName)) {
+                                add = true;
+
+                                break;
+                            }
+                        }
+                    }
+
+                    if (add) {
+                        curVector.add(lib);
+
+                        return;
+                    }
+                }
+
+                ldr = ldr.getParent();
+            }
+        }
+        catch (Exception e) {
+            U.quietAndWarn(null, "Failed to initialize Hadoop native library " 
+
+                "(native Hadoop methods might not work properly): " + e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Class<?> loadClass(String name, boolean resolve) 
throws ClassNotFoundException {
+        try {
+            // Always load Hadoop classes explicitly, since Hadoop can be 
available in App classpath.
+            if (HadoopClassLoaderUtils.isHadoop(name)) {
+                if (name.equals(CLS_SHUTDOWN_HOOK_MANAGER))  // Dirty hack to 
get rid of Hadoop shutdown hooks.
+                    return loadReplace(name, 
CLS_SHUTDOWN_HOOK_MANAGER_REPLACE);
+                else if (name.equals(CLS_DAEMON))
+                    // We replace this in order to be able to forcibly stop 
some daemon threads
+                    // that otherwise never stop (e.g. PeerCache runnables):
+                    return loadReplace(name, CLS_DAEMON_REPLACE);
+
+                return loadClassExplicitly(name, resolve);
+            }
+
+            // For Ignite Hadoop and IGFS classes we have to check if they 
depend on Hadoop.
+            if (HadoopClassLoaderUtils.isHadoopIgfs(name)) {
+                if (hasExternalDependencies(name))
+                    return loadClassExplicitly(name, resolve);
+            }
+
+            return super.loadClass(name, resolve);
+        }
+        catch (NoClassDefFoundError | ClassNotFoundException e) {
+            throw new ClassNotFoundException("Failed to load class: " + name, 
e);
+        }
+    }
+
+    /**
+     * Load a class replacing it with our own implementation.
+     *
+     * @param originalName Name.
+     * @param replaceName Replacement.
+     * @return Class.
+     */
+    private Class<?> loadReplace(final String originalName, final String 
replaceName) {
+        synchronized (getClassLoadingLock(originalName)) {
+            // First, check if the class has already been loaded
+            Class c = findLoadedClass(originalName);
+
+            if (c != null)
+                return c;
+
+            byte[] bytes = bytesCache.get(originalName);
+
+            if (bytes == null) {
+                InputStream in = 
HadoopClassLoaderUtils.loadClassBytes(getParent(), replaceName);
+
+                bytes = HadoopClassLoaderUtils.loadReplace(in, originalName, 
replaceName);
+
+                bytesCache.put(originalName, bytes);
+            }
+
+            return defineClass(originalName, bytes, 0, bytes.length);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Class<?> getFromCache(String clsName) throws 
ClassNotFoundException {
+        Class<?> cls = cacheMap.get(clsName);
+
+        if (cls == null) {
+            Class old = cacheMap.putIfAbsent(clsName, cls = 
Class.forName(clsName, true, this));
+
+            if (old != null)
+                cls = old;
+        }
+
+        return cls;
+    }
+
+    /**
+     * @param name Class name.
+     * @param resolve Resolve class.
+     * @return Class.
+     * @throws ClassNotFoundException If failed.
+     */
+    private Class<?> loadClassExplicitly(String name, boolean resolve) throws 
ClassNotFoundException {
+        synchronized (getClassLoadingLock(name)) {
+            // First, check if the class has already been loaded
+            Class c = findLoadedClass(name);
+
+            if (c == null) {
+                long t1 = System.nanoTime();
+
+                c = findClass(name);
+
+                // this is the defining class loader; record the stats
+                sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
+                sun.misc.PerfCounter.getFindClasses().increment();
+            }
+
+            if (resolve)
+                resolveClass(c);
+
+            return c;
+        }
+    }
+
+    /**
+     * Check whether class has external dependencies on Hadoop.
+     *
+     * @param clsName Class name.
+     * @return {@code True} if class has external dependencies.
+     */
+    boolean hasExternalDependencies(String clsName) {
+        return HadoopClassLoaderUtils.hasExternalDependencies(clsName, 
getParent());
+    }
+
+    /**
+     * @param urls URLs.
+     * @return URLs.
+     */
+    private static URL[] addHadoopUrls(URL[] urls) {
+        Collection<URL> hadoopJars;
+
+        try {
+            hadoopJars = hadoopUrls();
+        }
+        catch (IgniteCheckedException e) {
+            throw new RuntimeException(e);
+        }
+
+        ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + 
appJars.size() + (urls == null ? 0 : urls.length));
+
+        list.addAll(appJars);
+        list.addAll(hadoopJars);
+
+        if (!F.isEmpty(urls))
+            list.addAll(F.asList(urls));
+
+        return list.toArray(new URL[list.size()]);
+    }
+
+    /**
+     * @return Collection of jar URLs.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static Collection<URL> hadoopUrls() throws IgniteCheckedException {
+        Collection<URL> hadoopUrls = hadoopJars;
+
+        if (hadoopUrls != null)
+            return hadoopUrls;
+
+        synchronized (HadoopClassLoader.class) {
+            hadoopUrls = hadoopJars;
+
+            if (hadoopUrls != null)
+                return hadoopUrls;
+
+            try {
+                hadoopUrls = HadoopClasspathUtils.classpathForClassLoader();
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Failed to resolve Hadoop JAR 
locations: " + e.getMessage(), e);
+            }
+
+            hadoopJars = hadoopUrls;
+
+            return hadoopUrls;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopClassLoader.class, this);
+    }
+
+    /**
+     * Getter for name field.
+     */
+    public String name() {
+        return name;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java
new file mode 100644
index 0000000..3415d6a
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+import org.objectweb.asm.AnnotationVisitor;
+import org.objectweb.asm.Attribute;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.ClassVisitor;
+import org.objectweb.asm.ClassWriter;
+import org.objectweb.asm.FieldVisitor;
+import org.objectweb.asm.Handle;
+import org.objectweb.asm.Label;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.commons.Remapper;
+import org.objectweb.asm.commons.RemappingClassAdapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility methods for Hadoop classloader required to avoid direct 3rd-party 
dependencies in class loader.
+ */
+public class HadoopClassLoaderUtils {
+    /** Cache for resolved dependency info. */
+    private static final Map<String, Boolean> dependenciesCache = new 
ConcurrentHashMap8<>();
+
+    /**
+     * Load special replacement and impersonate
+     *
+     * @param in Input stream.
+     * @param originalName Original class name.
+     * @param replaceName Replacer class name.
+     * @return Result.
+     */
+    public static byte[] loadReplace(InputStream in, final String 
originalName, final String replaceName) {
+        ClassReader rdr;
+
+        try {
+            rdr = new ClassReader(in);
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        ClassWriter w = new ClassWriter(Opcodes.ASM4);
+
+        rdr.accept(new RemappingClassAdapter(w, new Remapper() {
+            /** */
+            String replaceType = replaceName.replace('.', '/');
+
+            /** */
+            String nameType = originalName.replace('.', '/');
+
+            @Override public String map(String type) {
+                if (type.equals(replaceType))
+                    return nameType;
+
+                return type;
+            }
+        }), ClassReader.EXPAND_FRAMES);
+
+        return w.toByteArray();
+    }
+
+    /**
+     * @param cls Class name.
+     * @return {@code true} If this is Hadoop class.
+     */
+    public static boolean isHadoop(String cls) {
+        return cls.startsWith("org.apache.hadoop.");
+    }
+
+    /**
+     * Need to parse only Ignite Hadoop and IGFS classes.
+     *
+     * @param cls Class name.
+     * @return {@code true} if we need to check this class.
+     */
+    public static boolean isHadoopIgfs(String cls) {
+        String ignitePkgPrefix = "org.apache.ignite";
+
+        int len = ignitePkgPrefix.length();
+
+        return cls.startsWith(ignitePkgPrefix) && (
+            cls.indexOf("igfs.", len) != -1 ||
+                cls.indexOf(".fs.", len) != -1 ||
+                cls.indexOf("hadoop.", len) != -1);
+    }
+
+    /**
+     * @param ldr Loader.
+     * @param clsName Class.
+     * @return Input stream.
+     */
+    @Nullable public static InputStream loadClassBytes(ClassLoader ldr, String 
clsName) {
+        return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class");
+    }
+
+    /**
+     * Check whether class has external dependencies on Hadoop.
+     *
+     * @param clsName Class name.
+     * @param parentClsLdr Parent class loader.
+     * @return {@code True} if class has external dependencies.
+     */
+    static boolean hasExternalDependencies(String clsName, ClassLoader 
parentClsLdr) {
+        Boolean hasDeps = dependenciesCache.get(clsName);
+
+        if (hasDeps == null) {
+            CollectingContext ctx = new CollectingContext(parentClsLdr);
+
+            ctx.annVisitor = new CollectingAnnotationVisitor(ctx);
+            ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor);
+            ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor);
+            ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, 
ctx.mthdVisitor, ctx.fldVisitor);
+
+            hasDeps = hasExternalDependencies(clsName, parentClsLdr, ctx);
+
+            dependenciesCache.put(clsName, hasDeps);
+        }
+
+        return hasDeps;
+    }
+
+    /**
+     * Check whether class has external dependencies on Hadoop.
+     *
+     * @param clsName Class name.
+     * @param parentClsLdr Parent class loader.
+     * @param ctx Context.
+     * @return {@code true} If the class has external dependencies.
+     */
+    static boolean hasExternalDependencies(String clsName, ClassLoader 
parentClsLdr, CollectingContext ctx) {
+        if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea 
sucks, so filtering explicitly as external.
+            return true;
+
+        // Try to get from parent to check if the type accessible.
+        InputStream in = loadClassBytes(parentClsLdr, clsName);
+
+        if (in == null) // The class is external itself, it must be loaded 
from this class loader.
+            return true;
+
+        if (!isHadoopIgfs(clsName)) // Other classes should not have external 
dependencies.
+            return false;
+
+        final ClassReader rdr;
+
+        try {
+            rdr = new ClassReader(in);
+        }
+        catch (IOException e) {
+            throw new RuntimeException("Failed to read class: " + clsName, e);
+        }
+
+        ctx.visited.add(clsName);
+
+        rdr.accept(ctx.clsVisitor, 0);
+
+        if (ctx.found) // We already know that we have dependencies, no need 
to check parent.
+            return true;
+
+        // Here we are known to not have any dependencies but possibly we have 
a parent which has them.
+        int idx = clsName.lastIndexOf('$');
+
+        if (idx == -1) // No parent class.
+            return false;
+
+        String parentCls = clsName.substring(0, idx);
+
+        if (ctx.visited.contains(parentCls))
+            return false;
+
+        Boolean res = dependenciesCache.get(parentCls);
+
+        if (res == null)
+            res = hasExternalDependencies(parentCls, parentClsLdr, ctx);
+
+        return res;
+    }
+
+    /**
+     * @param name Class name.
+     * @return {@code true} If this is a valid class name.
+     */
+    private static boolean validateClassName(String name) {
+        int len = name.length();
+
+        if (len <= 1)
+            return false;
+
+        if (!Character.isJavaIdentifierStart(name.charAt(0)))
+            return false;
+
+        boolean hasDot = false;
+
+        for (int i = 1; i < len; i++) {
+            char c = name.charAt(i);
+
+            if (c == '.')
+                hasDot = true;
+            else if (!Character.isJavaIdentifierPart(c))
+                return false;
+        }
+
+        return hasDot;
+    }
+
+    /**
+     * Context for dependencies collection.
+     */
+    private static class CollectingContext {
+        /** Visited classes. */
+        private final Set<String> visited = new HashSet<>();
+
+        /** Parent class loader. */
+        private final ClassLoader parentClsLdr;
+
+        /** Whether dependency found. */
+        private boolean found;
+
+        /** Annotation visitor. */
+        private AnnotationVisitor annVisitor;
+
+        /** Method visitor. */
+        private MethodVisitor mthdVisitor;
+
+        /** Field visitor. */
+        private FieldVisitor fldVisitor;
+
+        /** Class visitor. */
+        private ClassVisitor clsVisitor;
+
+        /**
+         * Constrcutor.
+         *
+         * @param parentClsLdr Parent class loader.
+         */
+        private CollectingContext(ClassLoader parentClsLdr) {
+            this.parentClsLdr = parentClsLdr;
+        }
+
+        /**
+         * Processes a method descriptor
+         * @param methDesc The method desc String.
+         */
+        void onMethodsDesc(final String methDesc) {
+            // Process method return type:
+            onType(Type.getReturnType(methDesc));
+
+            if (found)
+                return;
+
+            // Process method argument types:
+            for (Type t: Type.getArgumentTypes(methDesc)) {
+                onType(t);
+
+                if (found)
+                    return;
+            }
+        }
+
+        /**
+         * Processes dependencies of a class.
+         *
+         * @param depCls The class name as dot-notated FQN.
+         */
+        void onClass(final String depCls) {
+            assert depCls.indexOf('/') == -1 : depCls; // class name should be 
fully converted to dot notation.
+            assert depCls.charAt(0) != 'L' : depCls;
+            assert validateClassName(depCls) : depCls;
+
+            if (depCls.startsWith("java.") || depCls.startsWith("javax.")) // 
Filter out platform classes.
+                return;
+
+            if (visited.contains(depCls))
+                return;
+
+            Boolean res = dependenciesCache.get(depCls);
+
+            if (res == Boolean.TRUE || (res == null && 
hasExternalDependencies(depCls, parentClsLdr, this)))
+                found = true;
+        }
+
+        /**
+         * Analyses dependencies of given type.
+         *
+         * @param t The type to process.
+         */
+        void onType(Type t) {
+            if (t == null)
+                return;
+
+            int sort = t.getSort();
+
+            switch (sort) {
+                case Type.ARRAY:
+                    onType(t.getElementType());
+
+                    break;
+
+                case Type.OBJECT:
+                    onClass(t.getClassName());
+
+                    break;
+            }
+        }
+
+        /**
+         * Analyses dependencies of given object type.
+         *
+         * @param objType The object type to process.
+         */
+        void onInternalTypeName(String objType) {
+            if (objType == null)
+                return;
+
+            assert objType.length() > 1 : objType;
+
+            if (objType.charAt(0) == '[')
+                // handle array. In this case this is a type descriptor 
notation, like "[Ljava/lang/Object;"
+                onType(objType);
+            else {
+                assert objType.indexOf('.') == -1 : objType; // Must be 
slash-separated FQN.
+
+                String clsName = objType.replace('/', '.'); // Convert it to 
dot notation.
+
+                onClass(clsName); // Process.
+            }
+        }
+
+        /**
+         * Type description analyser.
+         *
+         * @param desc The description.
+         */
+        void onType(String desc) {
+            if (!F.isEmpty(desc)) {
+                if (desc.length() <= 1)
+                    return; // Optimization: filter out primitive types in 
early stage.
+
+                Type t = Type.getType(desc);
+
+                onType(t);
+            }
+        }
+    }
+
+    /**
+     * Annotation visitor.
+     */
+    private static class CollectingAnnotationVisitor extends AnnotationVisitor 
{
+        /** */
+        final CollectingContext ctx;
+
+        /**
+         * Annotation visitor.
+         *
+         * @param ctx The collector.
+         */
+        CollectingAnnotationVisitor(CollectingContext ctx) {
+            super(Opcodes.ASM4);
+
+            this.ctx = ctx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public AnnotationVisitor visitAnnotation(String name, String 
desc) {
+            if (ctx.found)
+                return null;
+
+            ctx.onType(desc);
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitEnum(String name, String desc, String val) {
+            if (ctx.found)
+                return;
+
+            ctx.onType(desc);
+        }
+
+        /** {@inheritDoc} */
+        @Override public AnnotationVisitor visitArray(String name) {
+            return ctx.found ? null : this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visit(String name, Object val) {
+            if (ctx.found)
+                return;
+
+            if (val instanceof Type)
+                ctx.onType((Type)val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitEnd() {
+            // No-op.
+        }
+    }
+
+    /**
+     * Field visitor.
+     */
+    private static class CollectingFieldVisitor extends FieldVisitor {
+        /** Collector. */
+        private final CollectingContext ctx;
+
+        /** Annotation visitor. */
+        private final AnnotationVisitor av;
+
+        /**
+         * Constructor.
+         */
+        CollectingFieldVisitor(CollectingContext ctx, AnnotationVisitor av) {
+            super(Opcodes.ASM4);
+
+            this.ctx = ctx;
+            this.av = av;
+        }
+
+        /** {@inheritDoc} */
+        @Override public AnnotationVisitor visitAnnotation(String desc, 
boolean visible) {
+            if (ctx.found)
+                return null;
+
+            ctx.onType(desc);
+
+            return ctx.found ? null : av;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitAttribute(Attribute attr) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitEnd() {
+            // No-op.
+        }
+    }
+
+    /**
+     * Class visitor.
+     */
+    private static class CollectingClassVisitor extends ClassVisitor {
+        /** Collector. */
+        private final CollectingContext ctx;
+
+        /** Annotation visitor. */
+        private final AnnotationVisitor av;
+
+        /** Method visitor. */
+        private final MethodVisitor mv;
+
+        /** Field visitor. */
+        private final FieldVisitor fv;
+
+        /**
+         * Constructor.
+         *
+         * @param ctx Collector.
+         * @param av Annotation visitor.
+         * @param mv Method visitor.
+         * @param fv Field visitor.
+         */
+        CollectingClassVisitor(CollectingContext ctx, AnnotationVisitor av, 
MethodVisitor mv, FieldVisitor fv) {
+            super(Opcodes.ASM4);
+
+            this.ctx = ctx;
+            this.av = av;
+            this.mv = mv;
+            this.fv = fv;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visit(int i, int i2, String name, String 
signature, String superName, String[] ifaces) {
+            if (ctx.found)
+                return;
+
+            ctx.onInternalTypeName(superName);
+
+            if (ctx.found)
+                return;
+
+            if (ifaces != null) {
+                for (String iface : ifaces) {
+                    ctx.onInternalTypeName(iface);
+
+                    if (ctx.found)
+                        return;
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public AnnotationVisitor visitAnnotation(String desc, 
boolean visible) {
+            if (ctx.found)
+                return null;
+
+            ctx.onType(desc);
+
+            return ctx.found ? null : av;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitInnerClass(String name, String outerName, 
String innerName, int i) {
+            if (ctx.found)
+                return;
+
+            ctx.onInternalTypeName(name);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FieldVisitor visitField(int i, String name, String 
desc, String signature, Object val) {
+            if (ctx.found)
+                return null;
+
+            ctx.onType(desc);
+
+            return ctx.found ? null : fv;
+        }
+
+        /** {@inheritDoc} */
+        @Override public MethodVisitor visitMethod(int i, String name, String 
desc, String signature,
+            String[] exceptions) {
+            if (ctx.found)
+                return null;
+
+            ctx.onMethodsDesc(desc);
+
+            // Process declared method exceptions:
+            if (exceptions != null) {
+                for (String e : exceptions)
+                    ctx.onInternalTypeName(e);
+            }
+
+            return ctx.found ? null : mv;
+        }
+    }
+
+    /**
+     * Method visitor.
+     */
+    private static class CollectingMethodVisitor extends MethodVisitor {
+        /** Collector. */
+        private final CollectingContext ctx;
+
+        /** Annotation visitor. */
+        private final AnnotationVisitor av;
+
+        /**
+         * Constructor.
+         *
+         * @param ctx Collector.
+         * @param av Annotation visitor.
+         */
+        private CollectingMethodVisitor(CollectingContext ctx, 
AnnotationVisitor av) {
+            super(Opcodes.ASM4);
+
+            this.ctx = ctx;
+            this.av = av;
+        }
+
+        /** {@inheritDoc} */
+        @Override public AnnotationVisitor visitAnnotation(String desc, 
boolean visible) {
+            if (ctx.found)
+                return null;
+
+            ctx.onType(desc);
+
+            return ctx.found ? null : av;
+        }
+
+        /** {@inheritDoc} */
+        @Override public AnnotationVisitor visitParameterAnnotation(int i, 
String desc, boolean b) {
+            if (ctx.found)
+                return null;
+
+            ctx.onType(desc);
+
+            return ctx.found ? null : av;
+        }
+
+        /** {@inheritDoc} */
+        @Override public AnnotationVisitor visitAnnotationDefault() {
+            return ctx.found ? null : av;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitFieldInsn(int opcode, String owner, String 
name, String desc) {
+            if (ctx.found)
+                return;
+
+            ctx.onInternalTypeName(owner);
+
+            if (ctx.found)
+                return;
+
+            ctx.onType(desc);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitInvokeDynamicInsn(String name, String desc, 
Handle bsm, Object... bsmArgs) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitFrame(int type, int nLoc, Object[] 
locTypes, int nStack, Object[] stackTypes) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitLocalVariable(String name, String desc, 
String signature, Label lb,
+            Label lb2, int i) {
+            if (ctx.found)
+                return;
+
+            ctx.onType(desc);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitMethodInsn(int i, String owner, String 
name, String desc) {
+            if (ctx.found)
+                return;
+
+            ctx.onInternalTypeName(owner);
+
+            if (ctx.found)
+                return;
+
+            ctx.onMethodsDesc(desc);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitMultiANewArrayInsn(String desc, int dim) {
+            if (ctx.found)
+                return;
+
+            ctx.onType(desc);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitTryCatchBlock(Label start, Label end, Label 
hndl, String typeStr) {
+            if (ctx.found)
+                return;
+
+            ctx.onInternalTypeName(typeStr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visitTypeInsn(int opcode, String type) {
+            if (ctx.found)
+                return;
+
+            ctx.onInternalTypeName(type);
+        }
+    }
+}

Reply via email to