diff --git 
new file mode 100644
index 0000000..aeda5c0
--- /dev/null
@@ -0,0 +1,62 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+ * Abstract class for all hadoop components.
+ */
+public abstract class HadoopComponent {
+    /** Hadoop context. */
+    protected HadoopContext ctx;
+    /** Logger. */
+    protected IgniteLogger log;
+    /**
+     * @param ctx Hadoop context.
+     */
+    public void start(HadoopContext ctx) throws IgniteCheckedException {
+        this.ctx = ctx;
+        log = ctx.kernalContext().log(getClass());
+    }
+    /**
+     * Stops manager.
+     */
+    public void stop(boolean cancel) {
+        // No-op.
+    }
+    /**
+     * Callback invoked when all grid components are started.
+     */
+    public void onKernalStart() throws IgniteCheckedException {
+        // No-op.
+    }
+    /**
+     * Callback invoked before all grid components are stopped.
+     */
+    public void onKernalStop(boolean cancel) {
+        // No-op.
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..42a3d72
--- /dev/null
@@ -0,0 +1,201 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+ * Hadoop accelerator context.
+ */
+public class HadoopContext {
+    /** Kernal context. */
+    private GridKernalContext ctx;
+    /** Hadoop configuration. */
+    private HadoopConfiguration cfg;
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+    /** External task executor. */
+    private HadoopTaskExecutorAdapter taskExecutor;
+    /** */
+    private HadoopShuffle shuffle;
+    /** Managers list. */
+    private List<HadoopComponent> components = new ArrayList<>();
+    /**
+     * @param ctx Kernal context.
+     */
+    public HadoopContext(
+        GridKernalContext ctx,
+        HadoopConfiguration cfg,
+        HadoopJobTracker jobTracker,
+        HadoopTaskExecutorAdapter taskExecutor,
+        HadoopShuffle shuffle
+    ) {
+        this.ctx = ctx;
+        this.cfg = cfg;
+        this.jobTracker = add(jobTracker);
+        this.taskExecutor = add(taskExecutor);
+        this.shuffle = add(shuffle);
+    }
+    /**
+     * Gets list of managers.
+     *
+     * @return List of managers.
+     */
+    public List<HadoopComponent> components() {
+        return components;
+    }
+    /**
+     * Gets kernal context.
+     *
+     * @return Grid kernal context instance.
+     */
+    public GridKernalContext kernalContext() {
+        return ctx;
+    }
+    /**
+     * Gets Hadoop configuration.
+     *
+     * @return Hadoop configuration.
+     */
+    public HadoopConfiguration configuration() {
+        return cfg;
+    }
+    /**
+     * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
+     *
+     * @return Local node ID.
+     */
+    public UUID localNodeId() {
+        return ctx.localNodeId();
+    }
+    /**
+     * Gets local node order.
+     *
+     * @return Local node order.
+     */
+    public long localNodeOrder() {
+        assert ctx.discovery() != null;
+        return ctx.discovery().localNode().order();
+    }
+    /**
+     * @return Hadoop-enabled nodes.
+     */
+    public Collection<ClusterNode> nodes() {
+        return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, 
+    }
+    /**
+     * @return {@code True} if
+     */
+    public boolean jobUpdateLeader() {
+        long minOrder = Long.MAX_VALUE;
+        ClusterNode minOrderNode = null;
+        for (ClusterNode node : nodes()) {
+            if (node.order() < minOrder) {
+                minOrder = node.order();
+                minOrderNode = node;
+            }
+        }
+        assert minOrderNode != null;
+        return localNodeId().equals(;
+    }
+    /**
+     * @param meta Job metadata.
+     * @return {@code true} If local node is participating in job execution.
+     */
+    public boolean isParticipating(HadoopJobMetadata meta) {
+        UUID locNodeId = localNodeId();
+        if (locNodeId.equals(meta.submitNodeId()))
+            return true;
+        HadoopMapReducePlan plan = meta.mapReducePlan();
+        return plan.mapperNodeIds().contains(locNodeId) || 
plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
+    }
+    /**
+     * @return Jon tracker instance.
+     */
+    public HadoopJobTracker jobTracker() {
+        return jobTracker;
+    }
+    /**
+     * @return Task executor.
+     */
+    public HadoopTaskExecutorAdapter taskExecutor() {
+        return taskExecutor;
+    }
+    /**
+     * @return Shuffle.
+     */
+    public HadoopShuffle shuffle() {
+        return shuffle;
+    }
+    /**
+     * @return Map-reduce planner.
+     */
+    public HadoopMapReducePlanner planner() {
+        return cfg.getMapReducePlanner();
+    }
+    /**
+     * Adds component.
+     *
+     * @param c Component to add.
+     * @return Added manager.
+     */
+    private <C extends HadoopComponent> C add(C c) {
+        components.add(c);
+        return c;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..1382c1f
--- /dev/null
@@ -0,0 +1,156 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+ * Hadoop job info based on default Hadoop configuration.
+ */
+public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
+    /** */
+    private static final long serialVersionUID = 5489900236464999951L;
+    /** {@code true} If job has combiner. */
+    private boolean hasCombiner;
+    /** Number of reducers configured for job. */
+    private int numReduces;
+    /** Configuration. */
+    private Map<String,String> props = new HashMap<>();
+    /** Job name. */
+    private String jobName;
+    /** User name. */
+    private String user;
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopDefaultJobInfo() {
+        // No-op.
+    }
+    /**
+     * Constructor.
+     *
+     * @param jobName Job name.
+     * @param user User name.
+     * @param hasCombiner {@code true} If job has combiner.
+     * @param numReduces Number of reducers configured for job.
+     * @param props All other properties of the job.
+     */
+    public HadoopDefaultJobInfo(String jobName, String user, boolean 
hasCombiner, int numReduces,
+        Map<String, String> props) {
+        this.jobName = jobName;
+        this.user = user;
+        this.hasCombiner = hasCombiner;
+        this.numReduces = numReduces;
+        this.props = props;
+    }
+    /** {@inheritDoc} */
+    @Nullable @Override public String property(String name) {
+        return props.get(name);
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, 
HadoopJobId jobId, IgniteLogger log,
+        @Nullable String[] libNames) throws IgniteCheckedException {
+        assert jobCls != null;
+        try {
+            Constructor<? extends HadoopJob> constructor = 
+                HadoopDefaultJobInfo.class, IgniteLogger.class, 
+            return constructor.newInstance(jobId, this, log, libNames);
+        }
+        catch (Throwable t) {
+            if (t instanceof Error)
+                throw (Error)t;
+            throw new IgniteCheckedException(t);
+        }
+    }
+    /** {@inheritDoc} */
+    @Override public boolean hasCombiner() {
+        return hasCombiner;
+    }
+    /** {@inheritDoc} */
+    @Override public boolean hasReducer() {
+        return reducers() > 0;
+    }
+    /** {@inheritDoc} */
+    @Override public int reducers() {
+        return numReduces;
+    }
+    /** {@inheritDoc} */
+    @Override public String jobName() {
+        return jobName;
+    }
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, jobName);
+        U.writeString(out, user);
+        out.writeBoolean(hasCombiner);
+        out.writeInt(numReduces);
+        U.writeStringMap(out, props);
+    }
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        jobName = U.readString(in);
+        user = U.readString(in);
+        hasCombiner = in.readBoolean();
+        numReduces = in.readInt();
+        props = U.readStringMap(in);
+    }
+    /**
+     * @return Properties of the job.
+     */
+    public Map<String, String> properties() {
+        return props;
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..ed2657e
--- /dev/null
@@ -0,0 +1,134 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+ * Hadoop facade implementation.
+ */
+public class HadoopImpl implements Hadoop {
+    /** Hadoop processor. */
+    private final HadoopProcessor proc;
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+    /**
+     * Constructor.
+     *
+     * @param proc Hadoop processor.
+     */
+    HadoopImpl(HadoopProcessor proc) {
+        this.proc = proc;
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration configuration() {
+        return proc.config();
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopJobId nextJobId() {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.nextJobId();
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get next job ID (grid 
is stopping).");
+    }
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, 
HadoopJobInfo jobInfo) {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.submit(jobId, jobInfo);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to submit job (grid is 
+    }
+    /** {@inheritDoc} */
+    @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) 
throws IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.status(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get job status (grid is 
+    }
+    /** {@inheritDoc} */
+    @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) 
throws IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.counters(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get job counters (grid 
is stopping).");
+    }
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteInternalFuture<?> 
finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.finishFuture(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get job finish future 
(grid is stopping).");
+    }
+    /** {@inheritDoc} */
+    @Override public boolean kill(HadoopJobId jobId) throws 
IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.kill(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to kill job (grid is 
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..4e03e17
--- /dev/null
@@ -0,0 +1,123 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.util.Iterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+    /** Counters. */
+    private final HadoopMapReduceCounters cntrs;
+    /** Group name. */
+    private final String name;
+    /**
+     * Creates new instance.
+     *
+     * @param cntrs Client counters instance.
+     * @param name Group name.
+     */
+    HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+        this.cntrs = cntrs;
+ = name;
+    }
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return name;
+    }
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return name;
+    }
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+    /** {@inheritDoc} */
+    @Override public void addCounter(Counter counter) {
+        addCounter(counter.getName(), counter.getDisplayName(), 0);
+    }
+    /** {@inheritDoc} */
+    @Override public Counter addCounter(String name, String displayName, long 
value) {
+        final Counter counter = cntrs.findCounter(, name);
+        counter.setValue(value);
+        return counter;
+    }
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, String 
displayName) {
+        return cntrs.findCounter(name, counterName);
+    }
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, boolean create) {
+        return cntrs.findCounter(name, counterName, create);
+    }
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName) {
+        return cntrs.findCounter(name, counterName);
+    }
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return cntrs.groupSize(name);
+    }
+    /** {@inheritDoc} */
+    @Override public void incrAllCounters(CounterGroupBase<Counter> 
rightGroup) {
+        for (final Counter counter : rightGroup)
+            cntrs.findCounter(name, 
+    }
+    /** {@inheritDoc} */
+    @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+        return this;
+    }
+    /** {@inheritDoc} */
+    @Override public Iterator<Counter> iterator() {
+        return cntrs.iterateGroup(name);
+    }
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..57a853f
--- /dev/null
@@ -0,0 +1,228 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter;
+import org.apache.ignite.internal.util.typedef.T2;
+ * Hadoop counters adapter.
+ */
+public class HadoopMapReduceCounters extends Counters {
+    /** */
+    private final Map<T2<String,String>,HadoopLongCounter> cntrs = new 
+    /**
+     * Creates new instance based on given counters.
+     *
+     * @param cntrs Counters to adapt.
+     */
+    public 
 cntrs) {
+        for (HadoopCounter cntr : cntrs.all())
+            if (cntr instanceof HadoopLongCounter)
+                this.cntrs.put(new T2<>(,, 
(HadoopLongCounter) cntr);
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
+        return addGroup(grp.getName(), grp.getDisplayName());
+    }
+    /** {@inheritDoc} */
+    @Override public CounterGroup addGroup(String name, String displayName) {
+        return new HadoopMapReduceCounterGroup(this, name);
+    }
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String grpName, String cntrName) {
+        return findCounter(grpName, cntrName, true);
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized Counter findCounter(Enum<?> key) {
+        return findCounter(key.getDeclaringClass().getName(),, 
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized Counter findCounter(String scheme, 
FileSystemCounter key) {
+        return findCounter(String.format("FileSystem Counter (%s)", scheme),;
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized Iterable<String> getGroupNames() {
+        Collection<String> res = new HashSet<>();
+        for (HadoopCounter counter : cntrs.values())
+            res.add(;
+        return res;
+    }
+    /** {@inheritDoc} */
+    @Override public Iterator<CounterGroup> iterator() {
+        final Iterator<String> iter = getGroupNames().iterator();
+        return new Iterator<CounterGroup>() {
+            @Override public boolean hasNext() {
+                return iter.hasNext();
+            }
+            @Override public CounterGroup next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+                return new 
+            }
+            @Override public void remove() {
+                throw new UnsupportedOperationException("not implemented");
+            }
+        };
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized CounterGroup getGroup(String grpName) {
+        return new HadoopMapReduceCounterGroup(this, grpName);
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized int countCounters() {
+        return cntrs.size();
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataOutput out) throws 
IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized void readFields(DataInput in) throws 
IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+    /** {@inheritDoc} */
+    @Override public synchronized void 
incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
+        for (CounterGroup group : other) {
+            for (Counter counter : group) {
+                findCounter(group.getName(), 
+            }
+        }
+    }
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object genericRight) {
+        if (!(genericRight instanceof HadoopMapReduceCounters))
+            return false;
+        return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
+    }
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return cntrs.hashCode();
+    }
+    /** {@inheritDoc} */
+    @Override public void setWriteAllCounters(boolean snd) {
+        // No-op.
+    }
+    /** {@inheritDoc} */
+    @Override public boolean getWriteAllCounters() {
+        return true;
+    }
+    /** {@inheritDoc} */
+    @Override public Limits limits() {
+        return null;
+    }
+    /**
+     * Returns size of a group.
+     *
+     * @param grpName Name of the group.
+     * @return amount of counters in the given group.
+     */
+    public int groupSize(String grpName) {
+        int res = 0;
+        for (HadoopCounter counter : cntrs.values()) {
+            if (grpName.equals(
+                res++;
+        }
+        return res;
+    }
+    /**
+     * Returns counters iterator for specified group.
+     *
+     * @param grpName Name of the group to iterate.
+     * @return Counters iterator.
+     */
+    public Iterator<Counter> iterateGroup(String grpName) {
+        Collection<Counter> grpCounters = new ArrayList<>();
+        for (HadoopLongCounter counter : cntrs.values()) {
+            if (grpName.equals(
+                grpCounters.add(new HadoopV2Counter(counter));
+        }
+        return grpCounters.iterator();
+    }
+    /**
+     * Find a counter in the group.
+     *
+     * @param grpName The name of the counter group.
+     * @param cntrName The name of the counter.
+     * @param create Create the counter if not found if true.
+     * @return The counter that was found or added or {@code null} if create 
is false.
+     */
+    public Counter findCounter(String grpName, String cntrName, boolean 
create) {
+        T2<String, String> key = new T2<>(grpName, cntrName);
+        HadoopLongCounter internalCntr = cntrs.get(key);
+        if (internalCntr == null & create) {
+            internalCntr = new HadoopLongCounter(grpName,cntrName);
+            cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
+        }
+        return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..b9c20c3
--- /dev/null
@@ -0,0 +1,223 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
+ * Hadoop processor.
+ */
+public class HadoopProcessor extends HadoopProcessorAdapter {
+    /** Job ID counter. */
+    private final AtomicInteger idCtr = new AtomicInteger();
+    /** Hadoop context. */
+    @GridToStringExclude
+    private HadoopContext hctx;
+    /** Hadoop facade for public API. */
+    @GridToStringExclude
+    private Hadoop hadoop;
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public HadoopProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
+        HadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
+        if (cfg == null)
+            cfg = new HadoopConfiguration();
+        else
+            cfg = new HadoopConfiguration(cfg);
+        initializeDefaults(cfg);
+        hctx = new HadoopContext(
+            ctx,
+            cfg,
+            new HadoopJobTracker(),
+            new HadoopEmbeddedTaskExecutor(),
+            // TODO: IGNITE-404: Uncomment when fixed.
+            //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : 
new HadoopEmbeddedTaskExecutor(),
+            new HadoopShuffle());
+        for (HadoopComponent c : hctx.components())
+            c.start(hctx);
+        hadoop = new HadoopImpl(this);
+        ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg));
+    }
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+        if (hctx == null)
+            return;
+        for (HadoopComponent c : hctx.components())
+            c.onKernalStart();
+    }
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+        if (hctx == null)
+            return;
+        List<HadoopComponent> components = hctx.components();
+        for (ListIterator<HadoopComponent> it = 
components.listIterator(components.size()); it.hasPrevious();) {
+            HadoopComponent c = it.previous();
+            c.onKernalStop(cancel);
+        }
+    }
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
+        if (hctx == null)
+            return;
+        List<HadoopComponent> components = hctx.components();
+        for (ListIterator<HadoopComponent> it = 
components.listIterator(components.size()); it.hasPrevious();) {
+            HadoopComponent c = it.previous();
+            c.stop(cancel);
+        }
+    }
+    /**
+     * Gets Hadoop context.
+     *
+     * @return Hadoop context.
+     */
+    public HadoopContext context() {
+        return hctx;
+    }
+    /** {@inheritDoc} */
+    @Override public Hadoop hadoop() {
+        if (hadoop == null)
+            throw new IllegalStateException("Hadoop accelerator is disabled 
(Hadoop is not in classpath, " +
+                "is HADOOP_HOME environment variable set?)");
+        return hadoop;
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration config() {
+        return hctx.configuration();
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopJobId nextJobId() {
+        return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
+    }
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, 
HadoopJobInfo jobInfo) {
+        return hctx.jobTracker().submit(jobId, jobInfo);
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopJobStatus status(HadoopJobId jobId) throws 
IgniteCheckedException {
+        return hctx.jobTracker().status(jobId);
+    }
+    /** {@inheritDoc} */
+    @Override public HadoopCounters counters(HadoopJobId jobId) throws 
IgniteCheckedException {
+        return hctx.jobTracker().jobCounters(jobId);
+    }
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) 
throws IgniteCheckedException {
+        return hctx.jobTracker().finishFuture(jobId);
+    }
+    /** {@inheritDoc} */
+    @Override public boolean kill(HadoopJobId jobId) throws 
IgniteCheckedException {
+        return hctx.jobTracker().killJob(jobId);
+    }
+    /** {@inheritDoc} */
+    @Override public void validateEnvironment() throws IgniteCheckedException {
+        // Perform some static checks as early as possible, so that any 
recoverable exceptions are thrown here.
+        try {
+            HadoopLocations loc = HadoopClasspathUtils.locations();
+            if (!F.isEmpty(loc.home()))
+                U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " 
+ loc.home());
+            U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + 
loc.common() + ", " + loc.hdfs() + ", " +
+                loc.mapred());
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe.getMessage(), ioe);
+        }
+        HadoopClassLoader.hadoopUrls();
+    }
+    /**
+     * Initializes default hadoop configuration.
+     *
+     * @param cfg Hadoop configuration.
+     */
+    private void initializeDefaults(HadoopConfiguration cfg) {
+        if (cfg.getMapReducePlanner() == null)
+            cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
+    }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProcessor.class, this);
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..ed39ce5
--- /dev/null
@@ -0,0 +1,542 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Scanner;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+ * Setup tool to configure Hadoop client.
+ */
+public class HadoopSetup {
+    /** */
+    public static final String WINUTILS_EXE = "winutils.exe";
+    /** */
+    private static final FilenameFilter IGNITE_JARS = new FilenameFilter() {
+        @Override public boolean accept(File dir, String name) {
+            return name.startsWith("ignite-") && name.endsWith(".jar");
+        }
+    };
+    /**
+     * The main method.
+     * @param ignore Params.
+     */
+    public static void main(String[] ignore) {
+        X.println(
+            "   __________  ________________ ",
+            "  /  _/ ___/ |/ /  _/_  __/ __/ ",
+            " _/ // (7 7    // /  / / / _/   ",
+            "/___/\\___/_/|_/___/ /_/ /___/  ",
+            "                for Apache Hadoop        ",
+            " ",
+            "ver. " + ACK_VER_STR,
+            COPYRIGHT);
+        configureHadoop();
+    }
+    /**
+     * This operation prepares the clean unpacked Hadoop distributive to work 
as client with Ignite-Hadoop.
+     * It performs these operations:
+     * <ul>
+     *     <li>Check for setting of HADOOP_HOME environment variable.</li>
+     *     <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to 
+     *     <li>In Windows check if winutils.exe exists and try to fix issue 
with some restrictions.</li>
+     *     <li>In Windows check new line character issues in CMD scripts.</li>
+     *     <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't 
exist tries to create ones.</li>
+     * </ul>
+     */
+    private static void configureHadoop() {
+        String igniteHome = U.getIgniteHome();
+        println("IGNITE_HOME is set to '" + igniteHome + "'.");
+        checkIgniteHome(igniteHome);
+        String homeVar = "HADOOP_HOME";
+        String hadoopHome = System.getenv(homeVar);
+        if (F.isEmpty(hadoopHome)) {
+            homeVar = "HADOOP_PREFIX";
+            hadoopHome = System.getenv(homeVar);
+        }
+        if (F.isEmpty(hadoopHome))
+            exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable 
is set. Please set one of them to a " +
+                "valid Hadoop installation directory and run setup tool 
again.", null);
+        hadoopHome = hadoopHome.replaceAll("\"", "");
+        println(homeVar + " is set to '" + hadoopHome + "'.");
+        String hiveHome = System.getenv("HIVE_HOME");
+        if (!F.isEmpty(hiveHome)) {
+            hiveHome = hiveHome.replaceAll("\"", "");
+            println("HIVE_HOME is set to '" + hiveHome + "'.");
+        }
+        File hadoopDir = new File(hadoopHome);
+        if (!hadoopDir.exists())
+            exit("Hadoop installation folder does not exist.", null);
+        if (!hadoopDir.isDirectory())
+            exit("HADOOP_HOME must point to a directory.", null);
+        if (!hadoopDir.canRead())
+            exit("Hadoop installation folder can not be read. Please check 
permissions.", null);
+        final File hadoopCommonDir;
+        String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME");
+        if (F.isEmpty(hadoopCommonHome)) {
+            hadoopCommonDir = new File(hadoopDir, "share/hadoop/common");
+            println("HADOOP_COMMON_HOME is not set, will use '" + 
hadoopCommonDir.getPath() + "'.");
+        }
+        else {
+            println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + 
+            hadoopCommonDir = new File(hadoopCommonHome);
+        }
+        if (!hadoopCommonDir.canRead())
+            exit("Failed to read Hadoop common dir '" + hadoopCommonDir + 
"'.", null);
+        final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib");
+        if (!hadoopCommonLibDir.canRead())
+            exit("Failed to read Hadoop 'lib' folder in '" + 
hadoopCommonLibDir.getPath() + "'.", null);
+        if (U.isWindows()) {
+            checkJavaPathSpaces();
+            final File hadoopBinDir = new File(hadoopDir, "bin");
+            if (!hadoopBinDir.canRead())
+                exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", 
+            File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE);
+            if (!winutilsFile.exists()) {
+                if (ask("File '" + WINUTILS_EXE + "' does not exist. " +
+                    "It may be replaced by a stub. Create it?")) {
+                    println("Creating file stub '" + 
winutilsFile.getAbsolutePath() + "'.");
+                    boolean ok = false;
+                    try {
+                        ok = winutilsFile.createNewFile();
+                    }
+                    catch (IOException ignore) {
+                        // No-op.
+                    }
+                    if (!ok)
+                        exit("Failed to create '" + WINUTILS_EXE + "' file. 
Please check permissions.", null);
+                }
+                else
+                    println("Ok. But Hadoop client probably will not work on 
Windows this way...");
+            }
+            processCmdFiles(hadoopDir, "bin", "sbin", "libexec");
+        }
+        File igniteLibs = new File(new File(igniteHome), "libs");
+        if (!igniteLibs.exists())
+            exit("Ignite 'libs' folder is not found.", null);
+        Collection<File> jarFiles = new ArrayList<>();
+        addJarsInFolder(jarFiles, igniteLibs);
+        addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
+        addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl"));
+        boolean jarsLinksCorrect = true;
+        for (File file : jarFiles) {
+            File link = new File(hadoopCommonLibDir, file.getName());
+            jarsLinksCorrect &= isJarLinkCorrect(link, file);
+            if (!jarsLinksCorrect)
+                break;
+        }
+        if (!jarsLinksCorrect) {
+            if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. 
" +
+                "Create appropriate symbolic links?")) {
+                File[] oldIgniteJarFiles = 
+                if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' 
directory contains JARs from other Ignite " +
+                    "installation. They must be deleted to continue. 
Continue?")) {
+                    for (File file : oldIgniteJarFiles) {
+                        println("Deleting file '" + file.getAbsolutePath() + 
+                        if (!file.delete())
+                            exit("Failed to delete file '" + file.getPath() + 
"'.", null);
+                    }
+                }
+                for (File file : jarFiles) {
+                    File targetFile = new File(hadoopCommonLibDir, 
+                    try {
+                        println("Creating symbolic link '" + 
targetFile.getAbsolutePath() + "'.");
+                        Files.createSymbolicLink(targetFile.toPath(), 
+                    }
+                    catch (IOException e) {
+                        if (U.isWindows()) {
+                            warn("Ability to create symbolic links is 
+                            warn("On Windows platform you have to grant 
permission 'Create symbolic links'");
+                            warn("to your user or run the Accelerator as 
+                        }
+                        exit("Creating symbolic link failed! Check 
permissions.", e);
+                    }
+                }
+            }
+            else
+                println("Ok. But Hadoop client will not be able to talk to 
Ignite cluster without those JARs in classpath...");
+        }
+        File hadoopEtc = new File(hadoopDir, "etc" + File.separator + 
+        File igniteHadoopCfg = igniteHadoopConfig(igniteHome);
+        if (!igniteHadoopCfg.canRead())
+            exit("Failed to read Ignite Hadoop 'config' folder at '" + 
igniteHadoopCfg.getAbsolutePath() + "'.", null);
+        if (hadoopEtc.canWrite()) { // TODO Bigtop
+            if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with 
preconfigured templates " +
+                "(existing files will be backed up)?")) {
+                replaceWithBackup(new File(igniteHadoopCfg, 
+                    new File(hadoopEtc, "core-site.xml"));
+                replaceWithBackup(new File(igniteHadoopCfg, 
+                    new File(hadoopEtc, "mapred-site.xml"));
+            }
+            else
+                println("Ok. You can configure them later, the templates are 
available at Ignite's 'docs' directory...");
+        }
+        if (!F.isEmpty(hiveHome)) {
+            File hiveConfDir = new File(hiveHome + File.separator + "conf");
+            if (!hiveConfDir.canWrite())
+                warn("Can not write to '" + hiveConfDir.getAbsolutePath() + 
"'. To run Hive queries you have to " +
+                    "configure 'hive-site.xml' manually. The template is 
available at Ignite's 'docs' directory.");
+            else if (ask("Replace 'hive-site.xml' with preconfigured template 
(existing file will be backed up)?"))
+                replaceWithBackup(new File(igniteHadoopCfg, 
+                    new File(hiveConfDir, "hive-site.xml"));
+            else
+                println("Ok. You can configure it later, the template is 
available at Ignite's 'docs' directory...");
+        }
+        println("Apache Hadoop setup is complete.");
+    }
+    /**
+     * Get Ignite Hadoop config directory.
+     *
+     * @param igniteHome Ignite home.
+     * @return Ignite Hadoop config directory.
+     */
+    private static File igniteHadoopConfig(String igniteHome) {
+        Path path = Paths.get(igniteHome, "modules", "hadoop", "config");
+        if (!Files.exists(path))
+            path = Paths.get(igniteHome, "config", "hadoop");
+        if (Files.exists(path))
+            return path.toFile();
+        else
+            return new File(igniteHome, "docs");
+    }
+    /**
+     * @param jarFiles Jars.
+     * @param folder Folder.
+     */
+    private static void addJarsInFolder(Collection<File> jarFiles, File 
folder) {
+        if (!folder.exists())
+            exit("Folder '" + folder.getAbsolutePath() + "' is not found.", 
+        jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS)));
+    }
+    /**
+     * Checks that JAVA_HOME does not contain space characters.
+     */
+    private static void checkJavaPathSpaces() {
+        String javaHome = System.getProperty("java.home");
+        if (javaHome.contains(" ")) {
+            warn("Java installation path contains space characters!");
+            warn("Hadoop client will not be able to start using '" + javaHome 
+ "'.");
+            warn("Please install JRE to path which does not contain spaces and 
point JAVA_HOME to that installation.");
+        }
+    }
+    /**
+     * Checks Ignite home.
+     *
+     * @param igniteHome Ignite home.
+     */
+    private static void checkIgniteHome(String igniteHome) {
+        URL jarUrl = 
+        try {
+            Path jar = Paths.get(jarUrl.toURI());
+            Path igHome = Paths.get(igniteHome);
+            if (!jar.startsWith(igHome))
+                exit("Ignite JAR files are not under IGNITE_HOME.", null);
+        }
+        catch (Exception e) {
+            exit(e.getMessage(), e);
+        }
+    }
+    /**
+     * Replaces target file with source file.
+     *
+     * @param from From.
+     * @param to To.
+     */
+    private static void replaceWithBackup(File from, File to) {
+        if (!from.canRead())
+            exit("Failed to read source file '" + from.getAbsolutePath() + 
"'.", null);
+        println("Replacing file '" + to.getAbsolutePath() + "'.");
+        try {
+            U.copy(from, renameToBak(to), true);
+        }
+        catch (IOException e) {
+            exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e);
+        }
+    }
+    /**
+     * Renames file for backup.
+     *
+     * @param file File.
+     * @return File.
+     */
+    private static File renameToBak(File file) {
+        DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
+        if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + 
"." + fmt.format(new Date()) + ".bak")))
+            exit("Failed to rename file '" + file.getPath() + "'.", null);
+        return file;
+    }
+    /**
+     * Checks if link is correct.
+     *
+     * @param link Symbolic link.
+     * @param correctTarget Correct link target.
+     * @return {@code true} If link target is correct.
+     */
+    private static boolean isJarLinkCorrect(File link, File correctTarget) {
+        if (!Files.isSymbolicLink(link.toPath()))
+            return false; // It is a real file or it does not exist.
+        Path target = null;
+        try {
+            target = Files.readSymbolicLink(link.toPath());
+        }
+        catch (IOException e) {
+            exit("Failed to read symbolic link: " + link.getAbsolutePath(), e);
+        }
+        return Files.exists(target) && target.toFile().equals(correctTarget);
+    }
+    /**
+     * Writes the question end read the boolean answer from the console.
+     *
+     * @param question Question to write.
+     * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise.
+     */
+    private static boolean ask(String question) {
+        X.println();
+        X.print(" <  " + question + " (Y/N): ");
+        String answer = null;
+        if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES")))
+            answer = "Y";
+        else {
+            BufferedReader br = new BufferedReader(new 
+            try {
+                answer = br.readLine();
+            }
+            catch (IOException e) {
+                exit("Failed to read answer: " + e.getMessage(), e);
+            }
+        }
+        if (answer != null && "Y".equals(answer.toUpperCase().trim())) {
+            X.println(" >  Yes.");
+            return true;
+        }
+        else {
+            X.println(" >  No.");
+            return false;
+        }
+    }
+    /**
+     * Exit with message.
+     *
+     * @param msg Exit message.
+     */
+    private static void exit(String msg, Exception e) {
+        X.println("    ");
+        X.println("  # " + msg);
+        X.println("  # Setup failed, exiting... ");
+        if (e != null && 
+            e.printStackTrace();
+        System.exit(1);
+    }
+    /**
+     * Prints message.
+     *
+     * @param msg Message.
+     */
+    private static void println(String msg) {
+        X.println("  > " + msg);
+    }
+    /**
+     * Prints warning.
+     *
+     * @param msg Message.
+     */
+    private static void warn(String msg) {
+        X.println("  ! " + msg);
+    }
+    /**
+     * Checks that CMD files have valid MS Windows new line characters. If 
not, writes question to console and reads the
+     * answer. If it's 'Y' then backups original files and corrects invalid 
new line characters.
+     *
+     * @param rootDir Root directory to process.
+     * @param dirs Directories inside of the root to process.
+     */
+    private static void processCmdFiles(File rootDir, String... dirs) {
+        boolean answer = false;
+        for (String dir : dirs) {
+            File subDir = new File(rootDir, dir);
+            File[] cmdFiles = subDir.listFiles(new FilenameFilter() {
+                @Override public boolean accept(File dir, String name) {
+                    return name.toLowerCase().endsWith(".cmd");
+                }
+            });
+            for (File file : cmdFiles) {
+                String content = null;
+                try (Scanner scanner = new Scanner(file)) {
+                    content = scanner.useDelimiter("\\Z").next();
+                }
+                catch (FileNotFoundException e) {
+                    exit("Failed to read file '" + file + "'.", e);
+                }
+                boolean invalid = false;
+                for (int i = 0; i < content.length(); i++) {
+                    if (content.charAt(i) == '\n' && (i == 0 || 
content.charAt(i - 1) != '\r')) {
+                        invalid = true;
+                        break;
+                    }
+                }
+                if (invalid) {
+                    answer = answer || ask("One or more *.CMD files has 
invalid new line character. Replace them?");
+                    if (!answer) {
+                        println("Ok. But Windows most probably will fail to 
execute them...");
+                        return;
+                    }
+                    println("Fixing newline characters in file '" + 
file.getAbsolutePath() + "'.");
+                    renameToBak(file);
+                    try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(file))) {
+                        for (int i = 0; i < content.length(); i++) {
+                            if (content.charAt(i) == '\n' && (i == 0 || 
content.charAt(i - 1) != '\r'))
+                                writer.write("\r");
+                            writer.write(content.charAt(i));
+                        }
+                    }
+                    catch (IOException e) {
+                        exit("Failed to write file '" + file.getPath() + "': " 
+ e.getMessage(), e);
+                    }
+                }
+            }
+        }
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..1dc8674
--- /dev/null
@@ -0,0 +1,35 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.IgniteException;
+ * Exception that throws when the task is cancelling.
+ */
+public class HadoopTaskCancelledException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+    /**
+     * @param msg Exception message.
+     */
+    public HadoopTaskCancelledException(String msg) {
+        super(msg);
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..65d9810
--- /dev/null
@@ -0,0 +1,443 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+ * Hadoop utility methods.
+ */
+public class HadoopUtils {
+    /** Property to store timestamp of new job id request. */
+    public static final String REQ_NEW_JOBID_TS_PROPERTY = 
+    /** Property to store timestamp of response of new job id request. */
+    public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = 
+    /** Property to store timestamp of job submission. */
+    public static final String JOB_SUBMISSION_START_TS_PROPERTY = 
+    /** Property to set custom writer of job statistics. */
+    public static final String JOB_COUNTER_WRITER_PROPERTY = 
+    /** Staging constant. */
+    private static final String STAGING_CONSTANT = ".staging";
+    /** Old mapper class attribute. */
+    private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
+    /** Old reducer class attribute. */
+    private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
+    /**
+     * Constructor.
+     */
+    private HadoopUtils() {
+        // No-op.
+    }
+    /**
+     * Wraps native split.
+     *
+     * @param id Split ID.
+     * @param split Split.
+     * @param hosts Hosts.
+     * @throws IOException If failed.
+     */
+    public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] 
hosts) throws IOException {
+        ByteArrayOutputStream arr = new ByteArrayOutputStream();
+        ObjectOutput out = new ObjectOutputStream(arr);
+        assert split instanceof Writable;
+        ((Writable)split).write(out);
+        out.flush();
+        return new HadoopSplitWrapper(id, split.getClass().getName(), 
arr.toByteArray(), hosts);
+    }
+    /**
+     * Unwraps native split.
+     *
+     * @param o Wrapper.
+     * @return Split.
+     */
+    public static Object unwrapSplit(HadoopSplitWrapper o) {
+        try {
+            Writable w = 
+            w.readFields(new ObjectInputStream(new 
+            return w;
+        }
+        catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+    /**
+     * Convert Ignite job status to Hadoop job status.
+     *
+     * @param status Ignite job status.
+     * @return Hadoop job status.
+     */
+    public static JobStatus status(HadoopJobStatus status, Configuration conf) 
+        JobID jobId = new JobID(status.jobId().globalId().toString(), 
+        float setupProgress = 0;
+        float mapProgress = 0;
+        float reduceProgress = 0;
+        float cleanupProgress = 0;
+        JobStatus.State state = JobStatus.State.RUNNING;
+        switch (status.jobPhase()) {
+            case PHASE_SETUP:
+                setupProgress = 0.42f;
+                break;
+            case PHASE_MAP:
+                setupProgress = 1;
+                mapProgress = 1f - status.pendingMapperCnt() / 
+                break;
+            case PHASE_REDUCE:
+                setupProgress = 1;
+                mapProgress = 1;
+                if (status.totalReducerCnt() > 0)
+                    reduceProgress = 1f - status.pendingReducerCnt() / 
+                else
+                    reduceProgress = 1f;
+                break;
+            case PHASE_CANCELLING:
+            case PHASE_COMPLETE:
+                if (!status.isFailed()) {
+                    setupProgress = 1;
+                    mapProgress = 1;
+                    reduceProgress = 1;
+                    cleanupProgress = 1;
+                    state = JobStatus.State.SUCCEEDED;
+                }
+                else
+                    state = JobStatus.State.FAILED;
+                break;
+            default:
+                assert false;
+        }
+        return new JobStatus(jobId, setupProgress, mapProgress, 
reduceProgress, cleanupProgress, state,
+            JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, 
status.user(), jobId).toString(), "N/A");
+    }
+    /**
+     * Gets staging area directory.
+     *
+     * @param conf Configuration.
+     * @param usr User.
+     * @return Staging area directory.
+     */
+    public static Path stagingAreaDir(Configuration conf, String usr) {
+        return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, 
+            + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
+    }
+    /**
+     * Gets job file.
+     *
+     * @param conf Configuration.
+     * @param usr User.
+     * @param jobId Job ID.
+     * @return Job file.
+     */
+    public static Path jobFile(Configuration conf, String usr, JobID jobId) {
+        return new Path(stagingAreaDir(conf, usr), jobId.toString() + 
+    }
+    /**
+     * Checks the attribute in configuration is not set.
+     *
+     * @param attr Attribute name.
+     * @param msg Message for creation of exception.
+     * @throws IgniteCheckedException If attribute is set.
+     */
+    public static void ensureNotSet(Configuration cfg, String attr, String 
msg) throws IgniteCheckedException {
+        if (cfg.get(attr) != null)
+            throw new IgniteCheckedException(attr + " is incompatible with " + 
msg + " mode.");
+    }
+    /**
+     * Creates JobInfo from hadoop configuration.
+     *
+     * @param cfg Hadoop configuration.
+     * @return Job info.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws 
IgniteCheckedException {
+        JobConf jobConf = new JobConf(cfg);
+        boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
+                || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
+        int numReduces = jobConf.getNumReduceTasks();
+        jobConf.setBooleanIfUnset("", 
jobConf.get(OLD_MAP_CLASS_ATTR) == null);
+        if (jobConf.getUseNewMapper()) {
+            String mode = "new map API";
+            ensureNotSet(jobConf, "mapred.input.format.class", mode);
+            ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
+            if (numReduces != 0)
+                ensureNotSet(jobConf, "mapred.partitioner.class", mode);
+            else
+                ensureNotSet(jobConf, "mapred.output.format.class", mode);
+        }
+        else {
+            String mode = "map compatibility";
+            ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
+            ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
+            if (numReduces != 0)
+                ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, 
+            else
+                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, 
+        }
+        if (numReduces != 0) {
+            jobConf.setBooleanIfUnset("", 
jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
+            if (jobConf.getUseNewReducer()) {
+                String mode = "new reduce API";
+                ensureNotSet(jobConf, "mapred.output.format.class", mode);
+                ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
+            }
+            else {
+                String mode = "reduce compatibility";
+                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, 
+                ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
+            }
+        }
+        Map<String, String> props = new HashMap<>();
+        for (Map.Entry<String, String> entry : jobConf)
+            props.put(entry.getKey(), entry.getValue());
+        return new HadoopDefaultJobInfo(jobConf.getJobName(), 
jobConf.getUser(), hasCombiner, numReduces, props);
+    }
+    /**
+     * Throws new {@link IgniteCheckedException} with original exception is 
serialized into string.
+     * This is needed to transfer error outside the current class loader.
+     *
+     * @param e Original exception.
+     * @return IgniteCheckedException New exception.
+     */
+    public static IgniteCheckedException transformException(Throwable e) {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        e.printStackTrace(new PrintStream(os, true));
+        return new IgniteCheckedException(os.toString());
+    }
+    /**
+     * Returns work directory for job execution.
+     *
+     * @param locNodeId Local node ID.
+     * @param jobId Job ID.
+     * @return Working directory for job.
+     * @throws IgniteCheckedException If Failed.
+     */
+    public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws 
IgniteCheckedException {
+        return new File(new File(U.resolveWorkDirectory("hadoop", false), 
"node-" + locNodeId), "job_" + jobId);
+    }
+    /**
+     * Returns subdirectory of job working directory for task execution.
+     *
+     * @param locNodeId Local node ID.
+     * @param info Task info.
+     * @return Working directory for task.
+     * @throws IgniteCheckedException If Failed.
+     */
+    public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) 
throws IgniteCheckedException {
+        File jobLocDir = jobLocalDir(locNodeId, info.jobId());
+        return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" 
+ info.attempt());
+    }
+    /**
+     * Creates {@link Configuration} in a correct class loader context to 
avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link Configuration}.
+     */
+    public static Configuration safeCreateConfiguration() {
+        final ClassLoader oldLdr = 
+        try {
+            return new Configuration();
+        }
+        finally {
+            restoreContextClassLoader(oldLdr);
+        }
+    }
+    /**
+     * Sort input splits by length.
+     *
+     * @param splits Splits.
+     * @return Sorted splits.
+     */
+    public static List<HadoopInputSplit> 
sortInputSplits(Collection<HadoopInputSplit> splits) {
+        int id = 0;
+        TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+        for (HadoopInputSplit split : splits) {
+            long len = split instanceof HadoopFileBlock ? 
((HadoopFileBlock)split).length() : 0;
+            sortedSplits.add(new SplitSortWrapper(id++, split, len));
+        }
+        ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+        for (SplitSortWrapper sortedSplit : sortedSplits)
+            res.add(sortedSplit.split);
+        return res;
+    }
+    /**
+     * Set context class loader.
+     *
+     * @param newLdr New class loader.
+     * @return Old class loader.
+     */
+    @Nullable public static ClassLoader setContextClassLoader(@Nullable 
ClassLoader newLdr) {
+        ClassLoader oldLdr = Thread.currentThread().getContextClassLoader();
+        if (newLdr != oldLdr)
+            Thread.currentThread().setContextClassLoader(newLdr);
+        return oldLdr;
+    }
+    /**
+     * Restore context class loader.
+     *
+     * @param oldLdr Original class loader.
+     */
+    public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) 
+        ClassLoader newLdr = Thread.currentThread().getContextClassLoader();
+        if (newLdr != oldLdr)
+            Thread.currentThread().setContextClassLoader(oldLdr);
+    }
+    /**
+     * Split wrapper for sorting.
+     */
+    private static class SplitSortWrapper implements 
Comparable<SplitSortWrapper> {
+        /** Unique ID. */
+        private final int id;
+        /** Split. */
+        private final HadoopInputSplit split;
+        /** Split length. */
+        private final long len;
+        /**
+         * Constructor.
+         *
+         * @param id Unique ID.
+         * @param split Split.
+         * @param len Split length.
+         */
+        public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+   = id;
+            this.split = split;
+            this.len = len;
+        }
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(SplitSortWrapper other) {
+            assert other != null;
+            long res = len - other.len;
+            if (res > 0)
+                return -1;
+            else if (res < 0)
+                return 1;
+            else
+                return id -;
+        }
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof SplitSortWrapper && id == 
+        }
+    }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..3f682d3
--- /dev/null
@@ -0,0 +1,129 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.counter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+ * Default Hadoop counter implementation.
+ */
+public abstract class HadoopCounterAdapter implements HadoopCounter, 
Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+    /** Counter group name. */
+    private String grp;
+    /** Counter name. */
+    private String name;
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    protected HadoopCounterAdapter() {
+        // No-op.
+    }
+    /**
+     * Creates new counter with given group and name.
+     *
+     * @param grp Counter group name.
+     * @param name Counter name.
+     */
+    protected HadoopCounterAdapter(String grp, String name) {
+        assert grp != null : "counter must have group";
+        assert name != null : "counter must have name";
+        this.grp = grp;
+ = name;
+    }
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+    /** {@inheritDoc} */
+    @Override @Nullable public String group() {
+        return grp;
+    }
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(grp);
+        out.writeUTF(name);
+        writeValue(out);
+    }
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        grp = in.readUTF();
+        name = in.readUTF();
+        readValue(in);
+    }
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        HadoopCounterAdapter cntr = (HadoopCounterAdapter)o;
+        if (!grp.equals(cntr.grp))
+            return false;
+        if (!name.equals(
+            return false;
+        return true;
+    }
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = grp.hashCode();
+        res = 31 * res + name.hashCode();
+        return res;
+    }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopCounterAdapter.class, this);
+    }
+    /**
+     * Writes value of this counter to output.
+     *
+     * @param out Output.
+     * @throws IOException If failed.
+     */
+    protected abstract void writeValue(ObjectOutput out) throws IOException;
+    /**
+     * Read value of this counter from input.
+     *
+     * @param in Input.
+     * @throws IOException If failed.
+     */
+    protected abstract void readValue(ObjectInput in) throws IOException;
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..f3b5463
--- /dev/null
@@ -0,0 +1,200 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.counter;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jsr166.ConcurrentHashMap8;
+ * Default in-memory counters store.
+ */
+public class HadoopCountersImpl implements HadoopCounters, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+    /** */
+    private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new 
+    /**
+     * Default constructor. Creates new instance without counters.
+     */
+    public HadoopCountersImpl() {
+        // No-op.
+    }
+    /**
+     * Creates new instance that contain given counters.
+     *
+     * @param cntrs Counters to store.
+     */
+    public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) {
+        addCounters(cntrs, true);
+    }
+    /**
+     * Copy constructor.
+     *
+     * @param cntrs Counters to copy.
+     */
+    public HadoopCountersImpl(HadoopCounters cntrs) {
+        this(cntrs.all());
+    }
+    /**
+     * Creates counter instance.
+     *
+     * @param cls Class of the counter.
+     * @param grp Group name.
+     * @param name Counter name.
+     * @return Counter.
+     */
+    private <T extends HadoopCounter> T createCounter(Class<? extends 
HadoopCounter> cls, String grp,
+        String name) {
+        try {
+            Constructor constructor = cls.getConstructor(String.class, 
+            return (T)constructor.newInstance(grp, name);
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+    /**
+     * Adds counters collection in addition to existing counters.
+     *
+     * @param cntrs Counters to add.
+     * @param cp Whether to copy counters or not.
+     */
+    private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) {
+        assert cntrs != null;
+        for (HadoopCounter cntr : cntrs) {
+            if (cp) {
+                HadoopCounter cntrCp = createCounter(cntr.getClass(),,;
+                cntrCp.merge(cntr);
+                cntr = cntrCp;
+            }
+            cntrsMap.put(new CounterKey(cntr.getClass(),,, cntr);
+        }
+    }
+    /** {@inheritDoc} */
+    @Override public <T extends HadoopCounter> T counter(String grp, String 
name, Class<T> cls) {
+        assert cls != null;
+        CounterKey mapKey = new CounterKey(cls, grp, name);
+        T cntr = (T)cntrsMap.get(mapKey);
+        if (cntr == null) {
+            cntr = createCounter(cls, grp, name);
+            T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
+            if (old != null)
+                return old;
+        }
+        return cntr;
+    }
+    /** {@inheritDoc} */
+    @Override public Collection<HadoopCounter> all() {
+        return cntrsMap.values();
+    }
+    /** {@inheritDoc} */
+    @Override public void merge(HadoopCounters other) {
+        for (HadoopCounter counter : other.all())
+            counter(,, 
+    }
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeCollection(out, cntrsMap.values());
+    }
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        addCounters(U.<HadoopCounter>readCollection(in), false);
+    }
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        HadoopCountersImpl counters = (HadoopCountersImpl)o;
+        return cntrsMap.equals(counters.cntrsMap);
+    }
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return cntrsMap.hashCode();
+    }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopCountersImpl.class, this, "counters", 
+    }
+    /**
+     * The tuple of counter identifier components for more readable code.
+     */
+    private static class CounterKey extends GridTuple3<Class<? extends 
HadoopCounter>, String, String> {
+        /** */
+        private static final long serialVersionUID = 0L;
+        /**
+         * Constructor.
+         *
+         * @param cls Class of the counter.
+         * @param grp Group name.
+         * @param name Counter name.
+         */
+        private CounterKey(Class<? extends HadoopCounter> cls, String grp, 
String name) {
+            super(cls, grp, name);
+        }
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public CounterKey() {
+            // No-op.
+        }
+    }
\ No newline at end of file

Reply via email to