http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
new file mode 100644
index 0000000..37f81a6
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop output collector.
+ */
+public class HadoopV1OutputCollector implements OutputCollector {
+    /** Job configuration. */
+    private final JobConf jobConf;
+
+    /** Task context. */
+    private final HadoopTaskContext taskCtx;
+
+    /** Optional direct writer. */
+    private final RecordWriter writer;
+
+    /** Task attempt. */
+    private final TaskAttemptID attempt;
+
+    /**
+     * @param jobConf Job configuration.
+     * @param taskCtx Task context.
+     * @param directWrite Direct write flag.
+     * @param fileName File name.
+     * @throws IOException In case of IO exception.
+     */
+    HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, 
boolean directWrite,
+        @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+        this.jobConf = jobConf;
+        this.taskCtx = taskCtx;
+        this.attempt = attempt;
+
+        if (directWrite) {
+            jobConf.set("mapreduce.task.attempt.id", attempt.toString());
+
+            OutputFormat outFormat = jobConf.getOutputFormat();
+
+            writer = outFormat.getRecordWriter(null, jobConf, fileName, 
Reporter.NULL);
+        }
+        else
+            writer = null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void collect(Object key, Object val) throws IOException {
+        if (writer != null)
+            writer.write(key, val);
+        else {
+            try {
+                taskCtx.output().write(key, val);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    /**
+     * Close writer.
+     *
+     * @throws IOException In case of IO exception.
+     */
+    public void closeWriter() throws IOException {
+        if (writer != null)
+            writer.close(Reporter.NULL);
+    }
+
+    /**
+     * Setup task.
+     *
+     * @throws IOException If failed.
+     */
+    public void setup() throws IOException {
+        if (writer != null)
+            jobConf.getOutputCommitter().setupTask(new 
TaskAttemptContextImpl(jobConf, attempt));
+    }
+
+    /**
+     * Commit task.
+     *
+     * @throws IOException In failed.
+     */
+    public void commit() throws IOException {
+        if (writer != null) {
+            OutputCommitter outputCommitter = jobConf.getOutputCommitter();
+
+            TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, 
attempt);
+
+            if (outputCommitter.needsTaskCommit(taskCtx))
+                outputCommitter.commitTask(taskCtx);
+        }
+    }
+
+    /**
+     * Abort task.
+     */
+    public void abort() {
+        try {
+            if (writer != null)
+                jobConf.getOutputCommitter().abortTask(new 
TaskAttemptContextImpl(jobConf, attempt));
+        }
+        catch (IOException ignore) {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
new file mode 100644
index 0000000..0ab1bba
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+
+/**
+ * Hadoop partitioner adapter for v1 API.
+ */
+public class HadoopV1Partitioner implements HadoopPartitioner {
+    /** Partitioner instance. */
+    private Partitioner<Object, Object> part;
+
+    /**
+     * @param cls Hadoop partitioner class.
+     * @param conf Job configuration.
+     */
+    public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration 
conf) {
+        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, 
conf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key, Object val, int parts) {
+        return part.getPartition(key, val, parts);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
new file mode 100644
index 0000000..e656695
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
+
+/**
+ * Hadoop reduce task implementation for v1 API.
+ */
+public class HadoopV1ReduceTask extends HadoopV1Task {
+    /** {@code True} if reduce, {@code false} if combine. */
+    private final boolean reduce;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     * @param reduce {@code True} if reduce, {@code false} if combine.
+     */
+    public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
+        super(taskInfo);
+
+        this.reduce = reduce;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
+        HadoopJob job = taskCtx.job();
+
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        JobConf jobConf = ctx.jobConf();
+
+        HadoopTaskInput input = taskCtx.input();
+
+        HadoopV1OutputCollector collector = null;
+
+        try {
+            collector = collector(jobConf, ctx, reduce || 
!job.info().hasReducer(), fileName(), ctx.attemptId());
+
+            Reducer reducer;
+            if (reduce) reducer = 
ReflectionUtils.newInstance(jobConf.getReducerClass(),
+                jobConf);
+            else reducer = 
ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+                jobConf);
+
+            assert reducer != null;
+
+            try {
+                try {
+                    while (input.next()) {
+                        if (isCancelled())
+                            throw new HadoopTaskCancelledException("Reduce 
task cancelled.");
+
+                        reducer.reduce(input.key(), input.values(), collector, 
Reporter.NULL);
+                    }
+                }
+                finally {
+                    reducer.close();
+                }
+            }
+            finally {
+                collector.closeWriter();
+            }
+
+            collector.commit();
+        }
+        catch (Exception e) {
+            if (collector != null)
+                collector.abort();
+
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
new file mode 100644
index 0000000..5a63aab
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+
+/**
+ * Hadoop reporter implementation for v1 API.
+ */
+public class HadoopV1Reporter implements Reporter {
+    /** Context. */
+    private final HadoopTaskContext ctx;
+
+    /**
+     * Creates new instance.
+     *
+     * @param ctx Context.
+     */
+    public HadoopV1Reporter(HadoopTaskContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStatus(String status) {
+        // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counters.Counter getCounter(Enum<?> name) {
+        return getCounter(name.getDeclaringClass().getName(), name.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counters.Counter getCounter(String grp, String name) {
+        return new HadoopV1Counter(ctx.counter(grp, name, 
HadoopLongCounter.class));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrCounter(Enum<?> key, long amount) {
+        getCounter(key).increment(amount);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrCounter(String grp, String cntr, long amount) {
+        getCounter(grp, cntr).increment(amount);
+    }
+
+    /** {@inheritDoc} */
+    @Override public InputSplit getInputSplit() throws 
UnsupportedOperationException {
+        throw new UnsupportedOperationException("reporter has no input"); // 
TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getProgress() {
+        return 0.5f; // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public void progress() {
+        // TODO
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
new file mode 100644
index 0000000..d2f6823
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
+
+/**
+ * Hadoop setup task implementation for v1 API.
+ */
+public class HadoopV1SetupTask extends HadoopV1Task {
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     */
+    public HadoopV1SetupTask(HadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        try {
+            ctx.jobConf().getOutputFormat().checkOutputSpecs(null, 
ctx.jobConf());
+
+            OutputCommitter committer = ctx.jobConf().getOutputCommitter();
+
+            if (committer != null)
+                committer.setupJob(ctx.jobContext());
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
new file mode 100644
index 0000000..203def4
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop API v1 splitter.
+ */
+public class HadoopV1Splitter {
+    /** */
+    private static final String[] EMPTY_HOSTS = {};
+
+    /**
+     * @param jobConf Job configuration.
+     * @return Collection of mapped splits.
+     * @throws IgniteCheckedException If mapping failed.
+     */
+    public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) 
throws IgniteCheckedException {
+        try {
+            InputFormat<?, ?> format = jobConf.getInputFormat();
+
+            assert format != null;
+
+            InputSplit[] splits = format.getSplits(jobConf, 0);
+
+            Collection<HadoopInputSplit> res = new ArrayList<>(splits.length);
+
+            for (int i = 0; i < splits.length; i++) {
+                InputSplit nativeSplit = splits[i];
+
+                if (nativeSplit instanceof FileSplit) {
+                    FileSplit s = (FileSplit)nativeSplit;
+
+                    res.add(new HadoopFileBlock(s.getLocations(), 
s.getPath().toUri(), s.getStart(), s.getLength()));
+                }
+                else
+                    res.add(HadoopUtils.wrapSplit(i, nativeSplit, 
nativeSplit.getLocations()));
+            }
+
+            return res;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * @param clsName Input split class name.
+     * @param in Input stream.
+     * @param hosts Optional hosts.
+     * @return File block or {@code null} if it is not a {@link FileSplit} 
instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public static HadoopFileBlock readFileBlock(String clsName, 
FSDataInputStream in,
+        @Nullable String[] hosts) throws IgniteCheckedException {
+        if (!FileSplit.class.getName().equals(clsName))
+            return null;
+
+        FileSplit split = U.newInstance(FileSplit.class);
+
+        try {
+            split.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        if (hosts == null)
+            hosts = EMPTY_HOSTS;
+
+        return new HadoopFileBlock(hosts, split.getPath().toUri(), 
split.getStart(), split.getLength());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
new file mode 100644
index 0000000..a89323c
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extended Hadoop v1 task.
+ */
+public abstract class HadoopV1Task extends HadoopTask {
+    /** Indicates that this task is to be cancelled. */
+    private volatile boolean cancelled;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     */
+    protected HadoopV1Task(HadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /**
+     * Gets file name for that task result.
+     *
+     * @return File name.
+     */
+    public String fileName() {
+        NumberFormat numFormat = NumberFormat.getInstance();
+
+        numFormat.setMinimumIntegerDigits(5);
+        numFormat.setGroupingUsed(false);
+
+        return "part-" + numFormat.format(info().taskNumber());
+    }
+
+    /**
+     *
+     * @param jobConf Job configuration.
+     * @param taskCtx Task context.
+     * @param directWrite Direct write flag.
+     * @param fileName File name.
+     * @param attempt Attempt of task.
+     * @return Collector.
+     * @throws IOException In case of IO exception.
+     */
+    protected HadoopV1OutputCollector collector(JobConf jobConf, 
HadoopV2TaskContext taskCtx,
+        boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) 
throws IOException {
+        HadoopV1OutputCollector collector = new 
HadoopV1OutputCollector(jobConf, taskCtx, directWrite,
+            fileName, attempt) {
+            /** {@inheritDoc} */
+            @Override public void collect(Object key, Object val) throws 
IOException {
+                if (cancelled)
+                    throw new HadoopTaskCancelledException("Task cancelled.");
+
+                super.collect(key, val);
+            }
+        };
+
+        collector.setup();
+
+        return collector;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancelled = true;
+    }
+
+    /** Returns true if task is cancelled. */
+    public boolean isCancelled() {
+        return cancelled;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
new file mode 100644
index 0000000..9632525
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class HadoopDaemon extends Thread {
+    /** Lock object used for synchronization. */
+    private static final Object lock = new Object();
+
+    /** Collection to hold the threads to be stopped. */
+    private static Collection<HadoopDaemon> daemons = new LinkedList<>();
+
+    {
+        setDaemon(true); // always a daemon
+    }
+
+    /** Runnable of this thread, may be this. */
+    final Runnable runnable;
+
+    /**
+     * Construct a daemon thread.
+     */
+    public HadoopDaemon() {
+        super();
+
+        runnable = this;
+
+        enqueueIfNeeded();
+    }
+
+    /**
+     * Construct a daemon thread.
+     */
+    public HadoopDaemon(Runnable runnable) {
+        super(runnable);
+
+        this.runnable = runnable;
+
+        this.setName(runnable.toString());
+
+        enqueueIfNeeded();
+    }
+
+    /**
+     * Construct a daemon thread to be part of a specified thread group.
+     */
+    public HadoopDaemon(ThreadGroup grp, Runnable runnable) {
+        super(grp, runnable);
+
+        this.runnable = runnable;
+
+        this.setName(runnable.toString());
+
+        enqueueIfNeeded();
+    }
+
+    /**
+     * Getter for the runnable. May return this.
+     *
+     * @return the runnable
+     */
+    public Runnable getRunnable() {
+        return runnable;
+    }
+
+    /**
+     * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable.
+     *
+     * @param r the runnable.
+     * @return true if it is.
+     */
+    private static boolean isPeerCacheRunnable(Runnable r) {
+        String name = r.getClass().getName();
+
+        return name.startsWith("org.apache.hadoop.hdfs.PeerCache");
+    }
+
+    /**
+     * Enqueue this thread if it should be stopped upon the task end.
+     */
+    private void enqueueIfNeeded() {
+        synchronized (lock) {
+            if (daemons == null)
+                throw new RuntimeException("Failed to create HadoopDaemon (its 
registry is already cleared): " +
+                    "[classLoader=" + getClass().getClassLoader() + ']');
+
+            if (runnable.getClass().getClassLoader() == 
getClass().getClassLoader() && isPeerCacheRunnable(runnable))
+                daemons.add(this);
+        }
+    }
+
+    /**
+     * Stops all the registered threads.
+     */
+    public static void dequeueAndStopAll() {
+        synchronized (lock) {
+            if (daemons != null) {
+                for (HadoopDaemon daemon : daemons)
+                    daemon.interrupt();
+
+                daemons = null;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
new file mode 100644
index 0000000..c7e8a0a
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+
+/**
+ * Split serialized in external file.
+ */
+public class HadoopExternalSplit extends HadoopInputSplit {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long off;
+
+    /**
+     * For {@link Externalizable}.
+     */
+    public HadoopExternalSplit() {
+        // No-op.
+    }
+
+    /**
+     * @param hosts Hosts.
+     * @param off Offset of this split in external file.
+     */
+    public HadoopExternalSplit(String[] hosts, long off) {
+        assert off >= 0 : off;
+        assert hosts != null;
+
+        this.hosts = hosts;
+        this.off = off;
+    }
+
+    /**
+     * @return Offset of this input split in external file.
+     */
+    public long offset() {
+        return off;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(off);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        off = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        HadoopExternalSplit that = (HadoopExternalSplit) o;
+
+        return off == that.off;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)(off ^ (off >>> 32));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
new file mode 100644
index 0000000..844e7f8
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The wrapper around external serializer.
+ */
+public class HadoopSerializationWrapper<T> implements HadoopSerialization {
+    /** External serializer - writer. */
+    private final Serializer<T> serializer;
+
+    /** External serializer - reader. */
+    private final Deserializer<T> deserializer;
+
+    /** Data output for current write operation. */
+    private OutputStream currOut;
+
+    /** Data input for current read operation. */
+    private InputStream currIn;
+
+    /** Wrapper around current output to provide OutputStream interface. */
+    private final OutputStream outStream = new OutputStream() {
+        /** {@inheritDoc} */
+        @Override public void write(int b) throws IOException {
+            currOut.write(b);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(byte[] b, int off, int len) throws 
IOException {
+            currOut.write(b, off, len);
+        }
+    };
+
+    /** Wrapper around current input to provide InputStream interface. */
+    private final InputStream inStream = new InputStream() {
+        /** {@inheritDoc} */
+        @Override public int read() throws IOException {
+            return currIn.read();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(byte[] b, int off, int len) throws 
IOException {
+            return currIn.read(b, off, len);
+        }
+    };
+
+    /**
+     * @param serialization External serializer to wrap.
+     * @param cls The class to serialize.
+     */
+    public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> 
cls) throws IgniteCheckedException {
+        assert cls != null;
+
+        serializer = serialization.getSerializer(cls);
+        deserializer = serialization.getDeserializer(cls);
+
+        try {
+            serializer.open(outStream);
+            deserializer.open(inStream);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out, Object obj) throws 
IgniteCheckedException {
+        assert out != null;
+        assert obj != null;
+
+        try {
+            currOut = (OutputStream)out;
+
+            serializer.serialize((T)obj);
+
+            currOut = null;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object read(DataInput in, @Nullable Object obj) throws 
IgniteCheckedException {
+        assert in != null;
+
+        try {
+            currIn = (InputStream)in;
+
+            T res = deserializer.deserialize((T) obj);
+
+            currIn = null;
+
+            return res;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteCheckedException {
+        try {
+            serializer.close();
+            deserializer.close();
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
new file mode 100644
index 0000000..8bd71e0
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Fake manager for shutdown hooks.
+ */
+public class HadoopShutdownHookManager {
+    /** */
+    private static final HadoopShutdownHookManager MGR = new 
HadoopShutdownHookManager();
+
+    /**
+     * Return <code>ShutdownHookManager</code> singleton.
+     *
+     * @return <code>ShutdownHookManager</code> singleton.
+     */
+    public static HadoopShutdownHookManager get() {
+        return MGR;
+    }
+
+    /** */
+    private Set<Runnable> hooks = Collections.synchronizedSet(new 
HashSet<Runnable>());
+
+    /** */
+    private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
+
+    /**
+     * Singleton.
+     */
+    private HadoopShutdownHookManager() {
+        // No-op.
+    }
+
+    /**
+     * Adds a shutdownHook with a priority, the higher the priority
+     * the earlier will run. ShutdownHooks with same priority run
+     * in a non-deterministic order.
+     *
+     * @param shutdownHook shutdownHook <code>Runnable</code>
+     * @param priority priority of the shutdownHook.
+     */
+    public void addShutdownHook(Runnable shutdownHook, int priority) {
+        if (shutdownHook == null)
+            throw new IllegalArgumentException("shutdownHook cannot be NULL");
+
+        hooks.add(shutdownHook);
+    }
+
+    /**
+     * Removes a shutdownHook.
+     *
+     * @param shutdownHook shutdownHook to remove.
+     * @return TRUE if the shutdownHook was registered and removed,
+     * FALSE otherwise.
+     */
+    public boolean removeShutdownHook(Runnable shutdownHook) {
+        return hooks.remove(shutdownHook);
+    }
+
+    /**
+     * Indicates if a shutdownHook is registered or not.
+     *
+     * @param shutdownHook shutdownHook to check if registered.
+     * @return TRUE/FALSE depending if the shutdownHook is is registered.
+     */
+    public boolean hasShutdownHook(Runnable shutdownHook) {
+        return hooks.contains(shutdownHook);
+    }
+
+    /**
+     * Indicates if shutdown is in progress or not.
+     *
+     * @return TRUE if the shutdown is in progress, otherwise FALSE.
+     */
+    public boolean isShutdownInProgress() {
+        return shutdownInProgress.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
new file mode 100644
index 0000000..df77adb
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * The wrapper for native hadoop input splits.
+ *
+ * Warning!! This class must not depend on any Hadoop classes directly or 
indirectly.
+ */
+public class HadoopSplitWrapper extends HadoopInputSplit {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Native hadoop input split. */
+    private byte[] bytes;
+
+    /** */
+    private String clsName;
+
+    /** Internal ID */
+    private int id;
+
+    /**
+     * Creates new split wrapper.
+     */
+    public HadoopSplitWrapper() {
+        // No-op.
+    }
+
+    /**
+     * Creates new split wrapper.
+     *
+     * @param id Split ID.
+     * @param clsName Class name.
+     * @param bytes Serialized class.
+     * @param hosts Hosts where split is located.
+     */
+    public HadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] 
hosts) {
+        assert hosts != null;
+        assert clsName != null;
+        assert bytes != null;
+
+        this.hosts = hosts;
+        this.id = id;
+
+        this.clsName = clsName;
+        this.bytes = bytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(id);
+
+        out.writeUTF(clsName);
+        U.writeByteArray(out, bytes);
+    }
+
+    /**
+     * @return Class name.
+     */
+    public String className() {
+        return clsName;
+    }
+
+    /**
+     * @return Class bytes.
+     */
+    public byte[] bytes() {
+        return bytes;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        id = in.readInt();
+
+        clsName = in.readUTF();
+        bytes = U.readByteArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        HadoopSplitWrapper that = (HadoopSplitWrapper)o;
+
+        return id == that.id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
new file mode 100644
index 0000000..abb904c
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+
+/**
+ * Hadoop cleanup task (commits or aborts job).
+ */
+public class HadoopV2CleanupTask extends HadoopV2Task {
+    /** Abort flag. */
+    private final boolean abort;
+
+    /**
+     * @param taskInfo Task info.
+     * @param abort Abort flag.
+     */
+    public HadoopV2CleanupTask(HadoopTaskInfo taskInfo, boolean abort) {
+        super(taskInfo);
+
+        this.abort = abort;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
+        JobContextImpl jobCtx = taskCtx.jobContext();
+
+        try {
+            OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+            OutputCommitter committer = 
outputFormat.getOutputCommitter(hadoopContext());
+
+            if (committer != null) {
+                if (abort)
+                    committer.abortJob(jobCtx, JobStatus.State.FAILED);
+                else
+                    committer.commitJob(jobCtx);
+            }
+        }
+        catch (ClassNotFoundException | IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
new file mode 100644
index 0000000..2ff2945
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+
+/**
+ * Hadoop context implementation for v2 API. It provides IO operations for 
hadoop tasks.
+ */
+public class HadoopV2Context extends JobContextImpl implements MapContext, 
ReduceContext {
+    /** Input reader to overriding of HadoopTaskContext input. */
+    private RecordReader reader;
+
+    /** Output writer to overriding of HadoopTaskContext output. */
+    private RecordWriter writer;
+
+    /** Output is provided by executor environment. */
+    private final HadoopTaskOutput output;
+
+    /** Input is provided by executor environment. */
+    private final HadoopTaskInput input;
+
+    /** Unique identifier for a task attempt. */
+    private final TaskAttemptID taskAttemptID;
+
+    /** Indicates that this task is to be cancelled. */
+    private volatile boolean cancelled;
+
+    /** Input split. */
+    private InputSplit inputSplit;
+
+    /** */
+    private final HadoopTaskContext ctx;
+
+    /** */
+    private String status;
+
+    /**
+     * @param ctx Context for IO operations.
+     */
+    public HadoopV2Context(HadoopV2TaskContext ctx) {
+        super(ctx.jobConf(), ctx.jobContext().getJobID());
+
+        taskAttemptID = ctx.attemptId();
+
+        conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString());
+        conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString());
+
+        output = ctx.output();
+        input = ctx.input();
+
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public InputSplit getInputSplit() {
+        if (inputSplit == null) {
+            HadoopInputSplit split = ctx.taskInfo().inputSplit();
+
+            if (split == null)
+                return null;
+
+            if (split instanceof HadoopFileBlock) {
+                HadoopFileBlock fileBlock = (HadoopFileBlock)split;
+
+                inputSplit = new FileSplit(new Path(fileBlock.file()), 
fileBlock.start(), fileBlock.length(), null);
+            }
+            else
+            {
+                try {
+                    inputSplit = (InputSplit) 
((HadoopV2TaskContext)ctx).getNativeSplit(split);
+                } catch (IgniteCheckedException e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        }
+
+        return inputSplit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        return reader.nextKeyValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object getCurrentKey() throws IOException, 
InterruptedException {
+        if (reader != null)
+            return reader.getCurrentKey();
+
+        return input.key();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object getCurrentValue() throws IOException, 
InterruptedException {
+        return reader.getCurrentValue();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void write(Object key, Object val) throws IOException, 
InterruptedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        if (writer != null)
+            writer.write(key, val);
+        else {
+            try {
+                output.write(key, val);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputCommitter getOutputCommitter() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskAttemptID getTaskAttemptID() {
+        return taskAttemptID;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStatus(String msg) {
+        status = msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStatus() {
+        return status;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getProgress() {
+        return 0.5f; // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getCounter(Enum<?> cntrName) {
+        return getCounter(cntrName.getDeclaringClass().getName(), 
cntrName.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getCounter(String grpName, String cntrName) {
+        return new HadoopV2Counter(ctx.counter(grpName, cntrName, 
HadoopLongCounter.class));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void progress() {
+        // No-op.
+    }
+
+    /**
+     * Overrides default input data reader.
+     *
+     * @param reader New reader.
+     */
+    public void reader(RecordReader reader) {
+        this.reader = reader;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean nextKey() throws IOException, 
InterruptedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        return input.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable getValues() throws IOException, 
InterruptedException {
+        return new Iterable() {
+            @Override public Iterator iterator() {
+                return input.values();
+            }
+        };
+    }
+
+    /**
+     * @return Overridden output data writer.
+     */
+    public RecordWriter writer() {
+        return writer;
+    }
+
+    /**
+     * Overrides default output data writer.
+     *
+     * @param writer New writer.
+     */
+    public void writer(RecordWriter writer) {
+        this.writer = writer;
+    }
+
+    /**
+     * Cancels the task by stop the IO.
+     */
+    public void cancel() {
+        cancelled = true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
new file mode 100644
index 0000000..cad9e64
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+
+/**
+ * Adapter from own counter implementation into Hadoop API Counter od version 
2.0.
+ */
+public class HadoopV2Counter implements Counter {
+    /** Delegate. */
+    private final HadoopLongCounter cntr;
+
+    /**
+     * Creates new instance with given delegate.
+     *
+     * @param cntr Internal counter.
+     */
+    public HadoopV2Counter(HadoopLongCounter cntr) {
+        assert cntr != null : "counter must be non-null";
+
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return cntr.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return getName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getValue() {
+        return cntr.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setValue(long val) {
+        cntr.value(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void increment(long incr) {
+        cntr.increment(incr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getUnderlyingCounter() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
new file mode 100644
index 0000000..595474c
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Splitter;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
+
+/**
+ * Hadoop job implementation for v2 API.
+ */
+public class HadoopV2Job implements HadoopJob {
+    /** */
+    private final JobConf jobConf;
+
+    /** */
+    private final JobContextImpl jobCtx;
+
+    /** Hadoop job ID. */
+    private final HadoopJobId jobId;
+
+    /** Job info. */
+    protected final HadoopJobInfo jobInfo;
+
+    /** Native library names. */
+    private final String[] libNames;
+
+    /** */
+    private final JobID hadoopJobID;
+
+    /** */
+    private final HadoopV2JobResourceManager rsrcMgr;
+
+    /** */
+    private final ConcurrentMap<T2<HadoopTaskType, Integer>, 
GridFutureAdapter<HadoopTaskContext>> ctxs =
+        new ConcurrentHashMap8<>();
+
+    /** Pooling task context class and thus class loading environment. */
+    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = 
new ConcurrentLinkedQueue<>();
+
+    /** All created contexts. */
+    private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = 
new ConcurrentLinkedDeque<>();
+
+    /** File system cache map. */
+    private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = 
createHadoopLazyConcurrentMap();
+
+    /** Local node ID */
+    private volatile UUID locNodeId;
+
+    /** Serialized JobConf. */
+    private volatile byte[] jobConfData;
+
+    /**
+     * Constructor.
+     *
+     * @param jobId Job ID.
+     * @param jobInfo Job info.
+     * @param log Logger.
+     * @param libNames Optional additional native library names.
+     */
+    public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, 
IgniteLogger log,
+        @Nullable String[] libNames) {
+        assert jobId != null;
+        assert jobInfo != null;
+
+        this.jobId = jobId;
+        this.jobInfo = jobInfo;
+        this.libNames = libNames;
+
+        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
+
+        try {
+            hadoopJobID = new JobID(jobId.globalId().toString(), 
jobId.localId());
+
+            jobConf = new JobConf();
+
+            HadoopFileSystemsUtils.setupFileSystems(jobConf);
+
+            for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
+                jobConf.set(e.getKey(), e.getValue());
+
+            jobCtx = new JobContextImpl(jobConf, hadoopJobID);
+
+            rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this);
+        }
+        finally {
+            HadoopUtils.setContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId id() {
+        return jobId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobInfo info() {
+        return jobInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<HadoopInputSplit> input() throws 
IgniteCheckedException {
+        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(jobConf.getClassLoader());
+
+        try {
+            String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+
+            if (jobDirPath == null) { // Probably job was submitted not by 
hadoop client.
+                // Assume that we have needed classes and try to generate 
input splits ourself.
+                if (jobConf.getUseNewMapper())
+                    return HadoopV2Splitter.splitJob(jobCtx);
+                else
+                    return HadoopV1Splitter.splitJob(jobConf);
+            }
+
+            Path jobDir = new Path(jobDirPath);
+
+            try {
+                FileSystem fs = fileSystem(jobDir.toUri(), jobConf);
+
+                JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
+                    jobDir);
+
+                if (F.isEmpty(metaInfos))
+                    throw new IgniteCheckedException("No input splits found.");
+
+                Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir);
+
+                try (FSDataInputStream in = fs.open(splitsFile)) {
+                    Collection<HadoopInputSplit> res = new 
ArrayList<>(metaInfos.length);
+
+                    for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) {
+                        long off = metaInfo.getStartOffset();
+
+                        String[] hosts = metaInfo.getLocations();
+
+                        in.seek(off);
+
+                        String clsName = Text.readString(in);
+
+                        HadoopFileBlock block = 
HadoopV1Splitter.readFileBlock(clsName, in, hosts);
+
+                        if (block == null)
+                            block = HadoopV2Splitter.readFileBlock(clsName, 
in, hosts);
+
+                        res.add(block != null ? block : new 
HadoopExternalSplit(hosts, off));
+                    }
+
+                    return res;
+                }
+            }
+            catch (Throwable e) {
+                if (e instanceof Error)
+                    throw (Error)e;
+                else
+                    throw transformException(e);
+            }
+        }
+        finally {
+            HadoopUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked", "MismatchedQueryAndUpdateOfCollection" })
+    @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) 
throws IgniteCheckedException {
+        T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(),  
info.taskNumber());
+
+        GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId);
+
+        if (fut != null)
+            return fut.get();
+
+        GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, 
fut = new GridFutureAdapter<>());
+
+        if (old != null)
+            return old.get();
+
+        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
+
+        try {
+            if (cls == null) {
+                // If there is no pooled class, then load new one.
+                // Note that the classloader identified by the task it was 
initially created for,
+                // but later it may be reused for other tasks.
+                HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
+                    HadoopClassLoader.nameForTask(info, false), libNames);
+
+                cls = (Class<? extends 
HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
+
+                fullCtxClsQueue.add(cls);
+            }
+
+            Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, 
HadoopJob.class,
+                HadoopJobId.class, UUID.class, DataInput.class);
+
+            if (jobConfData == null)
+                synchronized(jobConf) {
+                    if (jobConfData == null) {
+                        ByteArrayOutputStream buf = new 
ByteArrayOutputStream();
+
+                        jobConf.write(new DataOutputStream(buf));
+
+                        jobConfData = buf.toByteArray();
+                    }
+                }
+
+            HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, 
this, jobId, locNodeId,
+                new DataInputStream(new ByteArrayInputStream(jobConfData)));
+
+            fut.onDone(res);
+
+            return res;
+        }
+        catch (Throwable e) {
+            IgniteCheckedException te = transformException(e);
+
+            fut.onDone(te);
+
+            if (e instanceof Error)
+                throw (Error)e;
+
+            throw te;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(boolean external, UUID locNodeId) throws 
IgniteCheckedException {
+        assert locNodeId != null;
+
+        this.locNodeId = locNodeId;
+
+        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
+
+        try {
+            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, 
jobId));
+        }
+        finally {
+            HadoopUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ThrowFromFinallyBlock")
+    @Override public void dispose(boolean external) throws 
IgniteCheckedException {
+        try {
+            if (rsrcMgr != null && !external) {
+                File jobLocDir = jobLocalDir(locNodeId, jobId);
+
+                if (jobLocDir.exists())
+                    U.delete(jobLocDir);
+            }
+        }
+        finally {
+            taskCtxClsPool.clear();
+
+            Throwable err = null;
+
+            // Stop the daemon threads that have been created
+            // with the task class loaders:
+            while (true) {
+                Class<? extends HadoopTaskContext> cls = 
fullCtxClsQueue.poll();
+
+                if (cls == null)
+                    break;
+
+                try {
+                    final ClassLoader ldr = cls.getClassLoader();
+
+                    try {
+                        // Stop Hadoop daemons for this *task*:
+                        stopHadoopFsDaemons(ldr);
+                    }
+                    catch (Exception e) {
+                        if (err == null)
+                            err = e;
+                    }
+
+                    // Also close all the FileSystems cached in
+                    // HadoopLazyConcurrentMap for this *task* class loader:
+                    closeCachedTaskFileSystems(ldr);
+                }
+                catch (Throwable e) {
+                    if (err == null)
+                        err = e;
+
+                    if (e instanceof Error)
+                        throw (Error)e;
+                }
+            }
+
+            assert fullCtxClsQueue.isEmpty();
+
+            try {
+                // Close all cached file systems for this *Job*:
+                fsMap.close();
+            }
+            catch (Exception e) {
+                if (err == null)
+                    err = e;
+            }
+
+            if (err != null)
+                throw U.cast(err);
+        }
+    }
+
+    /**
+     * Stops Hadoop Fs daemon threads.
+     * @param ldr The task ClassLoader to stop the daemons for.
+     * @throws Exception On error.
+     */
+    private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception {
+        Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.CLS_DAEMON);
+
+        Method m = daemonCls.getMethod("dequeueAndStopAll");
+
+        m.invoke(null);
+    }
+
+    /**
+     * Closes all the file systems user by task
+     * @param ldr The task class loader.
+     * @throws Exception On error.
+     */
+    private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception {
+        Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName());
+
+        Method m = clazz.getMethod("close");
+
+        m.invoke(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
+        rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
+        HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), 
info.taskNumber())).get();
+
+        taskCtxClsPool.add(ctx.getClass());
+
+        File locDir = taskLocalDir(locNodeId, info);
+
+        if (locDir.exists())
+            U.delete(locDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupStagingDirectory() {
+        rsrcMgr.cleanupStagingDirectory();
+    }
+
+    /**
+     * Getter for job configuration.
+     * @return The job configuration.
+     */
+    public JobConf jobConf() {
+        return jobConf;
+    }
+
+    /**
+     * Gets file system for this job.
+     * @param uri The uri.
+     * @param cfg The configuration.
+     * @return The file system.
+     * @throws IOException On error.
+     */
+    public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws 
IOException {
+        return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
+    }
+}
\ No newline at end of file

Reply via email to