http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
new file mode 100644
index 0000000..33aef60
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.FileSystemException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.RunJar;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides all resources are needed to the job execution. Downloads the main 
jar, the configuration and additional
+ * files are needed to be placed on local files system.
+ */
+class HadoopV2JobResourceManager {
+    /** File type Fs disable caching property name. */
+    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME =
+        HadoopFileSystemsUtils.disableFsCachePropertyName("file");
+
+    /** Hadoop job context. */
+    private final JobContextImpl ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Job ID. */
+    private final HadoopJobId jobId;
+
+    /** Class path list. */
+    private URL[] clsPath;
+
+    /** Set of local resources. */
+    private final Collection<File> rsrcSet = new HashSet<>();
+
+    /** Staging directory to delivery job jar and config to the work nodes. */
+    private Path stagingDir;
+
+    /** The job. */
+    private final HadoopV2Job job;
+
+    /**
+     * Creates new instance.
+     * @param jobId Job ID.
+     * @param ctx Hadoop job context.
+     * @param log Logger.
+     */
+    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, 
IgniteLogger log, HadoopV2Job job) {
+        this.jobId = jobId;
+        this.ctx = ctx;
+        this.log = log.getLogger(HadoopV2JobResourceManager.class);
+        this.job = job;
+    }
+
+    /**
+     * Set working directory in local file system.
+     *
+     * @param dir Working directory.
+     * @throws IOException If fails.
+     */
+    private void setLocalFSWorkingDirectory(File dir) throws IOException {
+        JobConf cfg = ctx.getJobConf();
+
+        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(cfg.getClassLoader());
+
+        try {
+            cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, 
dir.getAbsolutePath());
+
+            if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
+                FileSystem.getLocal(cfg).setWorkingDirectory(new 
Path(dir.getAbsolutePath()));
+        }
+        finally {
+            HadoopUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /**
+     * Prepare job resources. Resolve the classpath list and download it if 
needed.
+     *
+     * @param download {@code true} If need to download resources.
+     * @param jobLocDir Work directory for the job.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void prepareJobEnvironment(boolean download, File jobLocDir) throws 
IgniteCheckedException {
+        try {
+            if (jobLocDir.exists())
+                throw new IgniteCheckedException("Local job directory already 
exists: " + jobLocDir.getAbsolutePath());
+
+            JobConf cfg = ctx.getJobConf();
+
+            String mrDir = cfg.get("mapreduce.job.dir");
+
+            if (mrDir != null) {
+                stagingDir = new Path(new URI(mrDir));
+
+                if (download) {
+                    FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg);
+
+                    if (!fs.exists(stagingDir))
+                        throw new IgniteCheckedException("Failed to find 
map-reduce submission " +
+                            "directory (does not exist): " + stagingDir);
+
+                    if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
+                        throw new IgniteCheckedException("Failed to copy job 
submission directory "
+                            + "contents to local file system "
+                            + "[path=" + stagingDir + ", locDir=" + 
jobLocDir.getAbsolutePath()
+                            + ", jobId=" + jobId + ']');
+                }
+
+                File jarJobFile = new File(jobLocDir, "job.jar");
+
+                Collection<URL> clsPathUrls = new ArrayList<>();
+
+                clsPathUrls.add(jarJobFile.toURI().toURL());
+
+                rsrcSet.add(jarJobFile);
+                rsrcSet.add(new File(jobLocDir, "job.xml"));
+
+                processFiles(jobLocDir, ctx.getCacheFiles(), download, false, 
null, MRJobConfig.CACHE_LOCALFILES);
+                processFiles(jobLocDir, ctx.getCacheArchives(), download, 
true, null, MRJobConfig.CACHE_LOCALARCHIVES);
+                processFiles(jobLocDir, ctx.getFileClassPaths(), download, 
false, clsPathUrls, null);
+                processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, 
true, clsPathUrls, null);
+
+                if (!clsPathUrls.isEmpty()) {
+                    clsPath = new URL[clsPathUrls.size()];
+
+                    clsPathUrls.toArray(clsPath);
+                }
+            }
+            else if (!jobLocDir.mkdirs())
+                throw new IgniteCheckedException("Failed to create local job 
directory: "
+                    + jobLocDir.getAbsolutePath());
+
+            setLocalFSWorkingDirectory(jobLocDir);
+        }
+        catch (URISyntaxException | IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Process list of resources.
+     *
+     * @param jobLocDir Job working directory.
+     * @param files Array of {@link java.net.URI} or {@link 
org.apache.hadoop.fs.Path} to process resources.
+     * @param download {@code true}, if need to download. Process class path 
only else.
+     * @param extract {@code true}, if need to extract archive.
+     * @param clsPathUrls Collection to add resource as classpath resource.
+     * @param rsrcNameProp Property for resource name array setting.
+     * @throws IOException If failed.
+     */
+    private void processFiles(File jobLocDir, @Nullable Object[] files, 
boolean download, boolean extract,
+        @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) 
throws IOException {
+        if (F.isEmptyOrNulls(files))
+            return;
+
+        Collection<String> res = new ArrayList<>();
+
+        for (Object pathObj : files) {
+            Path srcPath;
+
+            if (pathObj instanceof URI) {
+                URI uri = (URI)pathObj;
+
+                srcPath = new Path(uri);
+            }
+            else
+                srcPath = (Path)pathObj;
+
+            String locName = srcPath.getName();
+
+            File dstPath = new File(jobLocDir.getAbsolutePath(), locName);
+
+            res.add(locName);
+
+            rsrcSet.add(dstPath);
+
+            if (clsPathUrls != null)
+                clsPathUrls.add(dstPath.toURI().toURL());
+
+            if (!download)
+                continue;
+
+            JobConf cfg = ctx.getJobConf();
+
+            FileSystem dstFs = FileSystem.getLocal(cfg);
+
+            FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg);
+
+            if (extract) {
+                File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
+
+                if (!archivesPath.exists() && !archivesPath.mkdir())
+                    throw new IOException("Failed to create directory " +
+                        "[path=" + archivesPath + ", jobId=" + jobId + ']');
+
+                File archiveFile = new File(archivesPath, locName);
+
+                FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(archiveFile.toString()), false, cfg);
+
+                String archiveNameLC = archiveFile.getName().toLowerCase();
+
+                if (archiveNameLC.endsWith(".jar"))
+                    RunJar.unJar(archiveFile, dstPath);
+                else if (archiveNameLC.endsWith(".zip"))
+                    FileUtil.unZip(archiveFile, dstPath);
+                else if (archiveNameLC.endsWith(".tar.gz") ||
+                    archiveNameLC.endsWith(".tgz") ||
+                    archiveNameLC.endsWith(".tar"))
+                    FileUtil.unTar(archiveFile, dstPath);
+                else
+                    throw new IOException("Cannot unpack archive [path=" + 
srcPath + ", jobId=" + jobId + ']');
+            }
+            else
+                FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(dstPath.toString()), false, cfg);
+        }
+
+        if (!res.isEmpty() && rsrcNameProp != null)
+            ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new 
String[res.size()]));
+    }
+
+    /**
+     * Prepares working directory for the task.
+     *
+     * <ul>
+     *     <li>Creates working directory.</li>
+     *     <li>Creates symbolic links to all job resources in working 
directory.</li>
+     * </ul>
+     *
+     * @param path Path to working directory of the task.
+     * @throws IgniteCheckedException If fails.
+     */
+    public void prepareTaskWorkDir(File path) throws IgniteCheckedException {
+        try {
+            if (path.exists())
+                throw new IOException("Task local directory already exists: " 
+ path);
+
+            if (!path.mkdir())
+                throw new IOException("Failed to create directory: " + path);
+
+            for (File resource : rsrcSet) {
+                File symLink = new File(path, resource.getName());
+
+                try {
+                    Files.createSymbolicLink(symLink.toPath(), 
resource.toPath());
+                }
+                catch (IOException e) {
+                    String msg = "Unable to create symlink \"" + symLink + "\" 
to \"" + resource + "\".";
+
+                    if (U.isWindows() && e instanceof FileSystemException)
+                        msg += "\n\nAbility to create symbolic links is 
required!\n" +
+                                "On Windows platform you have to grant 
permission 'Create symbolic links'\n" +
+                                "to your user or run the Accelerator as 
Administrator.\n";
+
+                    throw new IOException(msg, e);
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Unable to prepare local working 
directory for the task " +
+                 "[jobId=" + jobId + ", path=" + path+ ']', e);
+        }
+    }
+
+    /**
+     * Cleans up job staging directory.
+     */
+    public void cleanupStagingDirectory() {
+        try {
+            if (stagingDir != null) {
+                FileSystem fs = job.fileSystem(stagingDir.toUri(), 
ctx.getJobConf());
+
+                fs.delete(stagingDir, true);
+            }
+        }
+        catch (Exception e) {
+            log.error("Failed to remove job staging directory [path=" + 
stagingDir + ", jobId=" + jobId + ']' , e);
+        }
+    }
+
+    /**
+     * Returns array of class path for current job.
+     *
+     * @return Class path collection.
+     */
+    @Nullable public URL[] classPath() {
+        return clsPath;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
new file mode 100644
index 0000000..fafa79b
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+
+/**
+ * Hadoop map task implementation for v2 API.
+ */
+public class HadoopV2MapTask extends HadoopV2Task {
+    /**
+     * @param taskInfo Task info.
+     */
+    public HadoopV2MapTask(HadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions", "unchecked"})
+    @Override public void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
+        OutputFormat outputFormat = null;
+        Exception err = null;
+
+        JobContextImpl jobCtx = taskCtx.jobContext();
+
+        try {
+            InputSplit nativeSplit = hadoopContext().getInputSplit();
+
+            if (nativeSplit == null)
+                throw new IgniteCheckedException("Input split cannot be 
null.");
+
+            InputFormat inFormat = 
ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
+                hadoopContext().getConfiguration());
+
+            RecordReader reader = inFormat.createRecordReader(nativeSplit, 
hadoopContext());
+
+            reader.initialize(nativeSplit, hadoopContext());
+
+            hadoopContext().reader(reader);
+
+            HadoopJobInfo jobInfo = taskCtx.job().info();
+
+            outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? 
null : prepareWriter(jobCtx);
+
+            Mapper mapper = 
ReflectionUtils.newInstance(jobCtx.getMapperClass(), 
hadoopContext().getConfiguration());
+
+            try {
+                mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+            }
+            finally {
+                closeWriter();
+            }
+
+            commit(outputFormat);
+        }
+        catch (InterruptedException e) {
+            err = e;
+
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+        catch (Exception e) {
+            err = e;
+
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            if (err != null)
+                abort(outputFormat);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
new file mode 100644
index 0000000..e199ede
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+
+/**
+ * Hadoop partitioner adapter for v2 API.
+ */
+public class HadoopV2Partitioner implements HadoopPartitioner {
+    /** Partitioner instance. */
+    private Partitioner<Object, Object> part;
+
+    /**
+     * @param cls Hadoop partitioner class.
+     * @param conf Job configuration.
+     */
+    public HadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, 
Configuration conf) {
+        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, 
conf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key, Object val, int parts) {
+        return part.getPartition(key, val, parts);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
new file mode 100644
index 0000000..e5c2ed2
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+
+/**
+ * Hadoop reduce task implementation for v2 API.
+ */
+public class HadoopV2ReduceTask extends HadoopV2Task {
+    /** {@code True} if reduce, {@code false} if combine. */
+    private final boolean reduce;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     * @param reduce {@code True} if reduce, {@code false} if combine.
+     */
+    public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
+        super(taskInfo);
+
+        this.reduce = reduce;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions", "unchecked"})
+    @Override public void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
+        OutputFormat outputFormat = null;
+        Exception err = null;
+
+        JobContextImpl jobCtx = taskCtx.jobContext();
+
+        try {
+            outputFormat = reduce || !taskCtx.job().info().hasReducer() ? 
prepareWriter(jobCtx) : null;
+
+            Reducer reducer;
+            if (reduce) reducer = 
ReflectionUtils.newInstance(jobCtx.getReducerClass(),
+                jobCtx.getConfiguration());
+            else reducer = 
ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
+                jobCtx.getConfiguration());
+
+            try {
+                reducer.run(new 
WrappedReducer().getReducerContext(hadoopContext()));
+            }
+            finally {
+                closeWriter();
+            }
+
+            commit(outputFormat);
+        }
+        catch (InterruptedException e) {
+            err = e;
+
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+        catch (Exception e) {
+            err = e;
+
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            if (err != null)
+                abort(outputFormat);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
new file mode 100644
index 0000000..49b5ee7
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+
+/**
+ * Hadoop setup task (prepares job).
+ */
+public class HadoopV2SetupTask extends HadoopV2Task {
+    /**
+     * Constructor.
+     *
+     * @param taskInfo task info.
+     */
+    public HadoopV2SetupTask(HadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override protected void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
+        try {
+            JobContextImpl jobCtx = taskCtx.jobContext();
+
+            OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+            outputFormat.checkOutputSpecs(jobCtx);
+
+            OutputCommitter committer = 
outputFormat.getOutputCommitter(hadoopContext());
+
+            if (committer != null)
+                committer.setupJob(jobCtx);
+        }
+        catch (ClassNotFoundException | IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
new file mode 100644
index 0000000..f4ed668
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop API v2 splitter.
+ */
+public class HadoopV2Splitter {
+    /** */
+    private static final String[] EMPTY_HOSTS = {};
+
+    /**
+     * @param ctx Job context.
+     * @return Collection of mapped splits.
+     * @throws IgniteCheckedException If mapping failed.
+     */
+    public static Collection<HadoopInputSplit> splitJob(JobContext ctx) throws 
IgniteCheckedException {
+        try {
+            InputFormat<?, ?> format = 
ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration());
+
+            assert format != null;
+
+            List<InputSplit> splits = format.getSplits(ctx);
+
+            Collection<HadoopInputSplit> res = new ArrayList<>(splits.size());
+
+            int id = 0;
+
+            for (InputSplit nativeSplit : splits) {
+                if (nativeSplit instanceof FileSplit) {
+                    FileSplit s = (FileSplit)nativeSplit;
+
+                    res.add(new HadoopFileBlock(s.getLocations(), 
s.getPath().toUri(), s.getStart(), s.getLength()));
+                }
+                else
+                    res.add(HadoopUtils.wrapSplit(id, nativeSplit, 
nativeSplit.getLocations()));
+
+                id++;
+            }
+
+            return res;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+
+    /**
+     * @param clsName Input split class name.
+     * @param in Input stream.
+     * @param hosts Optional hosts.
+     * @return File block or {@code null} if it is not a {@link FileSplit} 
instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static HadoopFileBlock readFileBlock(String clsName, DataInput in, 
@Nullable String[] hosts)
+        throws IgniteCheckedException {
+        if (!FileSplit.class.getName().equals(clsName))
+            return null;
+
+        FileSplit split = new FileSplit();
+
+        try {
+            split.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        if (hosts == null)
+            hosts = EMPTY_HOSTS;
+
+        return new HadoopFileBlock(hosts, split.getPath().toUri(), 
split.getStart(), split.getLength());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
new file mode 100644
index 0000000..1383a61
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extended Hadoop v2 task.
+ */
+public abstract class HadoopV2Task extends HadoopTask {
+    /** Hadoop context. */
+    private HadoopV2Context hadoopCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     */
+    protected HadoopV2Task(HadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        hadoopCtx = new HadoopV2Context(ctx);
+
+        run0(ctx);
+    }
+
+    /**
+     * Internal task routine.
+     *
+     * @param taskCtx Task context.
+     * @throws IgniteCheckedException
+     */
+    protected abstract void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException;
+
+    /**
+     * @return hadoop context.
+     */
+    protected HadoopV2Context hadoopContext() {
+        return hadoopCtx;
+    }
+
+    /**
+     * Create and configure an OutputFormat instance.
+     *
+     * @param jobCtx Job context.
+     * @return Instance of OutputFormat is specified in job configuration.
+     * @throws ClassNotFoundException If specified class not found.
+     */
+    protected OutputFormat getOutputFormat(JobContext jobCtx) throws 
ClassNotFoundException {
+        return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), 
hadoopContext().getConfiguration());
+    }
+
+    /**
+     * Put write into Hadoop context and return associated output format 
instance.
+     *
+     * @param jobCtx Job context.
+     * @return Output format.
+     * @throws IgniteCheckedException In case of Grid exception.
+     * @throws InterruptedException In case of interrupt.
+     */
+    protected OutputFormat prepareWriter(JobContext jobCtx)
+        throws IgniteCheckedException, InterruptedException {
+        try {
+            OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+            assert outputFormat != null;
+
+            OutputCommitter outCommitter = 
outputFormat.getOutputCommitter(hadoopCtx);
+
+            if (outCommitter != null)
+                outCommitter.setupTask(hadoopCtx);
+
+            RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx);
+
+            hadoopCtx.writer(writer);
+
+            return outputFormat;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Closes writer.
+     *
+     * @throws Exception If fails and logger hasn't been specified.
+     */
+    protected void closeWriter() throws Exception {
+        RecordWriter writer = hadoopCtx.writer();
+
+        if (writer != null)
+            writer.close(hadoopCtx);
+    }
+
+    /**
+     * Setup task.
+     *
+     * @param outputFormat Output format.
+     * @throws IOException In case of IO exception.
+     * @throws InterruptedException In case of interrupt.
+     */
+    protected void setup(@Nullable OutputFormat outputFormat) throws 
IOException, InterruptedException {
+        if (hadoopCtx.writer() != null) {
+            assert outputFormat != null;
+
+            outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx);
+        }
+    }
+
+    /**
+     * Commit task.
+     *
+     * @param outputFormat Output format.
+     * @throws IgniteCheckedException In case of Grid exception.
+     * @throws IOException In case of IO exception.
+     * @throws InterruptedException In case of interrupt.
+     */
+    protected void commit(@Nullable OutputFormat outputFormat) throws 
IgniteCheckedException, IOException, InterruptedException {
+        if (hadoopCtx.writer() != null) {
+            assert outputFormat != null;
+
+            OutputCommitter outputCommitter = 
outputFormat.getOutputCommitter(hadoopCtx);
+
+            if (outputCommitter.needsTaskCommit(hadoopCtx))
+                outputCommitter.commitTask(hadoopCtx);
+        }
+    }
+
+    /**
+     * Abort task.
+     *
+     * @param outputFormat Output format.
+     */
+    protected void abort(@Nullable OutputFormat outputFormat) {
+        if (hadoopCtx.writer() != null) {
+            assert outputFormat != null;
+
+            try {
+                
outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx);
+            }
+            catch (IOException ignore) {
+                // Ignore.
+            }
+            catch (InterruptedException ignore) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        hadoopCtx.cancel();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
new file mode 100644
index 0000000..4b1121c
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1CleanupTask;
+import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1MapTask;
+import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Partitioner;
+import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1ReduceTask;
+import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1SetupTask;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.unwrapSplit;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+
+/**
+ * Context for task execution.
+ */
+public class HadoopV2TaskContext extends HadoopTaskContext {
+    /** */
+    private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
+
+    /** Lazy per-user file system cache used by the Hadoop task. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
+        = createHadoopLazyConcurrentMap();
+
+    /**
+     * This method is called with reflection upon Job finish with class loader 
of each task.
+     * This will clean up all the Fs created for specific task.
+     * Each class loader sees uses its own instance of <code>fsMap<code/> 
since the class loaders
+     * are different.
+     *
+     * @throws IgniteCheckedException On error.
+     */
+    public static void close() throws IgniteCheckedException {
+        fsMap.close();
+    }
+
+    /**
+     * Check for combiner grouping support (available since Hadoop 2.3).
+     */
+    static {
+        boolean ok;
+
+        try {
+            
JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
+
+            ok = true;
+        }
+        catch (NoSuchMethodException ignore) {
+            ok = false;
+        }
+
+        COMBINE_KEY_GROUPING_SUPPORTED = ok;
+    }
+
+    /** Flag is set if new context-object code is used for running the mapper. 
*/
+    private final boolean useNewMapper;
+
+    /** Flag is set if new context-object code is used for running the 
reducer. */
+    private final boolean useNewReducer;
+
+    /** Flag is set if new context-object code is used for running the 
combiner. */
+    private final boolean useNewCombiner;
+
+    /** */
+    private final JobContextImpl jobCtx;
+
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+
+    /** Current task. */
+    private volatile HadoopTask task;
+
+    /** Local node ID */
+    private final UUID locNodeId;
+
+    /** Counters for task. */
+    private final HadoopCounters cntrs = new HadoopCountersImpl();
+
+    /**
+     * @param taskInfo Task info.
+     * @param job Job.
+     * @param jobId Job ID.
+     * @param locNodeId Local node ID.
+     * @param jobConfDataInput DataInput for read JobConf.
+     */
+    public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, 
HadoopJobId jobId,
+        @Nullable UUID locNodeId, DataInput jobConfDataInput) throws 
IgniteCheckedException {
+        super(taskInfo, job);
+        this.locNodeId = locNodeId;
+
+        // Before create JobConf instance we should set new context class 
loader.
+        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
+
+        try {
+            JobConf jobConf = new JobConf();
+
+            try {
+                jobConf.readFields(jobConfDataInput);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException(e);
+            }
+
+            // For map-reduce jobs prefer local writes.
+            jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
+
+            jobCtx = new JobContextImpl(jobConf, new 
JobID(jobId.globalId().toString(), jobId.localId()));
+
+            useNewMapper = jobConf.getUseNewMapper();
+            useNewReducer = jobConf.getUseNewReducer();
+            useNewCombiner = jobConf.getCombinerClass() == null;
+        }
+        finally {
+            HadoopUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends HadoopCounter> T counter(String grp, String 
name, Class<T> cls) {
+        return cntrs.counter(grp, name, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopCounters counters() {
+        return cntrs;
+    }
+
+    /**
+     * Creates appropriate task from current task info.
+     *
+     * @return Task.
+     */
+    private HadoopTask createTask() {
+        boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
+
+        switch (taskInfo().type()) {
+            case SETUP:
+                return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new 
HadoopV1SetupTask(taskInfo());
+
+            case MAP:
+                return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new 
HadoopV1MapTask(taskInfo());
+
+            case REDUCE:
+                return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), 
true) :
+                    new HadoopV1ReduceTask(taskInfo(), true);
+
+            case COMBINE:
+                return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), 
false) :
+                    new HadoopV1ReduceTask(taskInfo(), false);
+
+            case COMMIT:
+            case ABORT:
+                return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), 
isAbort) :
+                    new HadoopV1CleanupTask(taskInfo(), isAbort);
+
+            default:
+                return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() throws IgniteCheckedException {
+        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
+
+        try {
+            try {
+                task = createTask();
+            }
+            catch (Throwable e) {
+                if (e instanceof Error)
+                    throw e;
+
+                throw transformException(e);
+            }
+
+            if (cancelled)
+                throw new HadoopTaskCancelledException("Task cancelled.");
+
+            try {
+                task.run(this);
+            }
+            catch (Throwable e) {
+                if (e instanceof Error)
+                    throw e;
+
+                throw transformException(e);
+            }
+        }
+        finally {
+            task = null;
+
+            HadoopUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancelled = true;
+
+        HadoopTask t = task;
+
+        if (t != null)
+            t.cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment() throws 
IgniteCheckedException {
+        File locDir;
+
+        switch(taskInfo().type()) {
+            case MAP:
+            case REDUCE:
+                job().prepareTaskEnvironment(taskInfo());
+
+                locDir = taskLocalDir(locNodeId, taskInfo());
+
+                break;
+
+            default:
+                locDir = jobLocalDir(locNodeId, taskInfo().jobId());
+        }
+
+        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
+
+        try {
+            FileSystem.get(jobConf());
+
+            LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+
+            locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
+        }
+        catch (Throwable e) {
+            if (e instanceof Error)
+                throw (Error)e;
+
+            throw transformException(e);
+        }
+        finally {
+            HadoopUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment() throws 
IgniteCheckedException {
+        job().cleanupTaskEnvironment(taskInfo());
+    }
+
+    /**
+     * Creates Hadoop attempt ID.
+     *
+     * @return Attempt ID.
+     */
+    public TaskAttemptID attemptId() {
+        TaskID tid = new TaskID(jobCtx.getJobID(), 
taskType(taskInfo().type()), taskInfo().taskNumber());
+
+        return new TaskAttemptID(tid, taskInfo().attempt());
+    }
+
+    /**
+     * @param type Task type.
+     * @return Hadoop task type.
+     */
+    private TaskType taskType(HadoopTaskType type) {
+        switch (type) {
+            case SETUP:
+                return TaskType.JOB_SETUP;
+            case MAP:
+            case COMBINE:
+                return TaskType.MAP;
+
+            case REDUCE:
+                return TaskType.REDUCE;
+
+            case COMMIT:
+            case ABORT:
+                return TaskType.JOB_CLEANUP;
+
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Gets job configuration of the task.
+     *
+     * @return Job configuration.
+     */
+    public JobConf jobConf() {
+        return jobCtx.getJobConf();
+    }
+
+    /**
+     * Gets job context of the task.
+     *
+     * @return Job context.
+     */
+    public JobContextImpl jobContext() {
+        return jobCtx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopPartitioner partitioner() throws 
IgniteCheckedException {
+        Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", 
null);
+
+        if (partClsOld != null)
+            return new HadoopV1Partitioner(jobConf().getPartitionerClass(), 
jobConf());
+
+        try {
+            return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), 
jobConf());
+        }
+        catch (ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Gets serializer for specified class.
+     *
+     * @param cls Class.
+     * @param jobConf Job configuration.
+     * @return Appropriate serializer.
+     */
+    @SuppressWarnings("unchecked")
+    private HadoopSerialization getSerialization(Class<?> cls, Configuration 
jobConf) throws IgniteCheckedException {
+        A.notNull(cls, "cls");
+
+        SerializationFactory factory = new SerializationFactory(jobConf);
+
+        Serialization<?> serialization = factory.getSerialization(cls);
+
+        if (serialization == null)
+            throw new IgniteCheckedException("Failed to find serialization 
for: " + cls.getName());
+
+        if (serialization.getClass() == WritableSerialization.class)
+            return new HadoopWritableSerialization((Class<? extends 
Writable>)cls);
+
+        return new HadoopSerializationWrapper(serialization, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopSerialization keySerialization() throws 
IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopSerialization valueSerialization() throws 
IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> sortComparator() {
+        return (Comparator<Object>)jobCtx.getSortComparator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> groupComparator() {
+        Comparator<?> res;
+
+        switch (taskInfo().type()) {
+            case COMBINE:
+                res = COMBINE_KEY_GROUPING_SUPPORTED ?
+                    jobContext().getCombinerKeyGroupingComparator() : 
jobContext().getGroupingComparator();
+
+                break;
+
+            case REDUCE:
+                res = jobContext().getGroupingComparator();
+
+                break;
+
+            default:
+                return null;
+        }
+
+        if (res != null && res.getClass() != sortComparator().getClass())
+            return (Comparator<Object>)res;
+
+        return null;
+    }
+
+    /**
+     * @param split Split.
+     * @return Native Hadoop split.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    public Object getNativeSplit(HadoopInputSplit split) throws 
IgniteCheckedException {
+        if (split instanceof HadoopExternalSplit)
+            return readExternalSplit((HadoopExternalSplit)split);
+
+        if (split instanceof HadoopSplitWrapper)
+            return unwrapSplit((HadoopSplitWrapper)split);
+
+        throw new IllegalStateException("Unknown split: " + split);
+    }
+
+    /**
+     * @param split External split.
+     * @return Native input split.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private Object readExternalSplit(HadoopExternalSplit split) throws 
IgniteCheckedException {
+        Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
+
+        FileSystem fs;
+
+        try {
+            // This assertion uses .startsWith() instead of .equals() because 
task class loaders may
+            // be reused between tasks of the same job.
+            assert ((HadoopClassLoader)getClass().getClassLoader()).name()
+                .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
+
+            // We also cache Fs there, all them will be cleared explicitly 
upon the Job end.
+            fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), 
fsMap);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        try (
+            FSDataInputStream in = 
fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
+
+            in.seek(split.offset());
+
+            String clsName = Text.readString(in);
+
+            Class<?> cls = jobConf().getClassByName(clsName);
+
+            assert cls != null;
+
+            Serialization serialization = new 
SerializationFactory(jobConf()).getSerialization(cls);
+
+            Deserializer deserializer = serialization.getDeserializer(cls);
+
+            deserializer.open(in);
+
+            Object res = deserializer.deserialize(null);
+
+            deserializer.close();
+
+            assert res != null;
+
+            return res;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T runAsJobOwner(final Callable<T> c) throws 
IgniteCheckedException {
+        String user = job.info().user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        String ugiUser;
+
+        try {
+            UserGroupInformation currUser = 
UserGroupInformation.getCurrentUser();
+
+            assert currUser != null;
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return c.call();
+            else {
+                UserGroupInformation ugi = 
UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return c.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
new file mode 100644
index 0000000..f46f068
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Optimized serialization for Hadoop {@link Writable} types.
+ */
+public class HadoopWritableSerialization implements HadoopSerialization {
+    /** */
+    private final Class<? extends Writable> cls;
+
+    /**
+     * @param cls Class.
+     */
+    public HadoopWritableSerialization(Class<? extends Writable> cls) {
+        assert cls != null;
+
+        this.cls = cls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out, Object obj) throws 
IgniteCheckedException {
+        assert cls.isAssignableFrom(obj.getClass()) : cls + " " + 
obj.getClass();
+
+        try {
+            ((Writable)obj).write(out);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object read(DataInput in, @Nullable Object obj) throws 
IgniteCheckedException {
+        Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
+
+        try {
+            w.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return w;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
 
b/modules/hadoop-impl/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..8d5957b
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1 @@
+org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
new file mode 100644
index 0000000..5a20a75
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.hadoop;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+
+/**
+ * Hadoop client protocol tests in embedded process mode.
+ */
+public class HadoopClientProtocolEmbeddedSelfTest extends 
HadoopClientProtocolSelfTest {
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
new file mode 100644
index 0000000..1344e26
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.hadoop;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.StringTokenizer;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Hadoop client protocol tests in external process mode.
+ */
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
+    /** Input path. */
+    private static final String PATH_INPUT = "/input";
+
+    /** Output path. */
+    private static final String PATH_OUTPUT = "/output";
+
+    /** Job name. */
+    private static final String JOB_NAME = "myJob";
+
+    /** Setup lock file. */
+    private static File setupLockFile = new File(U.isWindows() ? 
System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-setup.file");
+
+    /** Map lock file. */
+    private static File mapLockFile = new File(U.isWindows() ? 
System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-map.file");
+
+    /** Reduce lock file. */
+    private static File reduceLockFile = new File(U.isWindows() ? 
System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-reduce.file");
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean restEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+
+        setupLockFile.delete();
+        mapLockFile.delete();
+        reduceLockFile.delete();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+
+//        IgniteHadoopClientProtocolProvider.cliMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        setupLockFile.createNewFile();
+        mapLockFile.createNewFile();
+        reduceLockFile.createNewFile();
+
+        setupLockFile.deleteOnExit();
+        mapLockFile.deleteOnExit();
+        reduceLockFile.deleteOnExit();
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format();
+
+        setupLockFile.delete();
+        mapLockFile.delete();
+        reduceLockFile.delete();
+
+        super.afterTest();
+    }
+
+    /**
+     * Test next job ID generation.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void tstNextJobId() throws Exception {
+        IgniteHadoopClientProtocolProvider provider = provider();
+
+        ClientProtocol proto = 
provider.create(config(HadoopAbstractSelfTest.REST_PORT));
+
+        JobID jobId = proto.getNewJobID();
+
+        assert jobId != null;
+        assert jobId.getJtIdentifier() != null;
+
+        JobID nextJobId = proto.getNewJobID();
+
+        assert nextJobId != null;
+        assert nextJobId.getJtIdentifier() != null;
+
+        assert !F.eq(jobId, nextJobId);
+    }
+
+    /**
+     * Tests job counters retrieval.
+     *
+     * @throws Exception If failed.
+     */
+    public void testJobCounters() throws Exception {
+        IgniteFileSystem igfs = 
grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
+
+        igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+        try (BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(igfs.create(
+            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+            bw.write(
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n" +
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n" +
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n"
+            );
+        }
+
+        Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
+
+        final Job job = Job.getInstance(conf);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestCountingMapper.class);
+        job.setReducerClass(TestCountingReducer.class);
+        job.setCombinerClass(TestCountingCombiner.class);
+
+        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+        job.submit();
+
+        final Counter cntr = 
job.getCounters().findCounter(TestCounter.COUNTER1);
+
+        assertEquals(0, cntr.getValue());
+
+        cntr.increment(10);
+
+        assertEquals(10, cntr.getValue());
+
+        // Transferring to map phase.
+        setupLockFile.delete();
+
+        // Transferring to reduce phase.
+        mapLockFile.delete();
+
+        job.waitForCompletion(false);
+
+        assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, 
job.getStatus().getState());
+
+        final Counters counters = job.getCounters();
+
+        assertNotNull("counters cannot be null", counters);
+        assertEquals("wrong counters count", 3, counters.countCounters());
+        assertEquals("wrong counter value", 15, 
counters.findCounter(TestCounter.COUNTER1).getValue());
+        assertEquals("wrong counter value", 3, 
counters.findCounter(TestCounter.COUNTER2).getValue());
+        assertEquals("wrong counter value", 3, 
counters.findCounter(TestCounter.COUNTER3).getValue());
+    }
+
+    /**
+     * Tests job counters retrieval for unknown job id.
+     *
+     * @throws Exception If failed.
+     */
+    private void tstUnknownJobCounters() throws Exception {
+        IgniteHadoopClientProtocolProvider provider = provider();
+
+        ClientProtocol proto = 
provider.create(config(HadoopAbstractSelfTest.REST_PORT));
+
+        try {
+            proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
+            fail("exception must be thrown");
+        }
+        catch (Exception e) {
+            assert e instanceof IOException : "wrong error has been thrown";
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMap() throws Exception {
+        checkJobSubmit(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapCombine() throws Exception {
+        checkJobSubmit(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapReduce() throws Exception {
+        checkJobSubmit(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapCombineReduce() throws Exception {
+        checkJobSubmit(false, false);
+    }
+
+    /**
+     * Test job submission.
+     *
+     * @param noCombiners Whether there are no combiners.
+     * @param noReducers Whether there are no reducers.
+     * @throws Exception If failed.
+     */
+    public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws 
Exception {
+        IgniteFileSystem igfs = 
grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
+
+        igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+        try (BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(igfs.create(
+            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+            bw.write("word");
+        }
+
+        Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
+
+        final Job job = Job.getInstance(conf);
+
+        job.setJobName(JOB_NAME);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+        job.setReducerClass(TestReducer.class);
+
+        if (!noCombiners)
+            job.setCombinerClass(TestCombiner.class);
+
+        if (noReducers)
+            job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(TestOutputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+        job.submit();
+
+        JobID jobId = job.getJobID();
+
+        // Setup phase.
+        JobStatus jobStatus = job.getStatus();
+        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 
0.0f);
+        assert jobStatus.getSetupProgress() >= 0.0f && 
jobStatus.getSetupProgress() < 1.0f;
+        assert jobStatus.getMapProgress() == 0.0f;
+        assert jobStatus.getReduceProgress() == 0.0f;
+
+        U.sleep(2100);
+
+        JobStatus recentJobStatus = job.getStatus();
+
+        assert recentJobStatus.getSetupProgress() > 
jobStatus.getSetupProgress() :
+            "Old=" + jobStatus.getSetupProgress() + ", new=" + 
recentJobStatus.getSetupProgress();
+
+        // Transferring to map phase.
+        setupLockFile.delete();
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return F.eq(1.0f, job.getStatus().getSetupProgress());
+                }
+                catch (Exception e) {
+                    throw new RuntimeException("Unexpected exception.", e);
+                }
+            }
+        }, 5000L);
+
+        // Map phase.
+        jobStatus = job.getStatus();
+        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 
0.0f);
+        assert jobStatus.getSetupProgress() == 1.0f;
+        assert jobStatus.getMapProgress() >= 0.0f && 
jobStatus.getMapProgress() < 1.0f;
+        assert jobStatus.getReduceProgress() == 0.0f;
+
+        U.sleep(2100);
+
+        recentJobStatus = job.getStatus();
+
+        assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
+            "Old=" + jobStatus.getMapProgress() + ", new=" + 
recentJobStatus.getMapProgress();
+
+        // Transferring to reduce phase.
+        mapLockFile.delete();
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return F.eq(1.0f, job.getStatus().getMapProgress());
+                }
+                catch (Exception e) {
+                    throw new RuntimeException("Unexpected exception.", e);
+                }
+            }
+        }, 5000L);
+
+        if (!noReducers) {
+            // Reduce phase.
+            jobStatus = job.getStatus();
+            checkJobStatus(jobStatus, jobId, JOB_NAME, 
JobStatus.State.RUNNING, 0.0f);
+            assert jobStatus.getSetupProgress() == 1.0f;
+            assert jobStatus.getMapProgress() == 1.0f;
+            assert jobStatus.getReduceProgress() >= 0.0f && 
jobStatus.getReduceProgress() < 1.0f;
+
+            // Ensure that reduces progress increases.
+            U.sleep(2100);
+
+            recentJobStatus = job.getStatus();
+
+            assert recentJobStatus.getReduceProgress() > 
jobStatus.getReduceProgress() :
+                "Old=" + jobStatus.getReduceProgress() + ", new=" + 
recentJobStatus.getReduceProgress();
+
+            reduceLockFile.delete();
+        }
+
+        job.waitForCompletion(false);
+
+        jobStatus = job.getStatus();
+        checkJobStatus(job.getStatus(), jobId, JOB_NAME, 
JobStatus.State.SUCCEEDED, 1.0f);
+        assert jobStatus.getSetupProgress() == 1.0f;
+        assert jobStatus.getMapProgress() == 1.0f;
+        assert jobStatus.getReduceProgress() == 1.0f;
+
+        dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+    }
+
+    /**
+     * Dump IGFS content.
+     *
+     * @param igfs IGFS.
+     * @param path Path.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws 
Exception {
+        IgfsFile file = igfs.info(path);
+
+        assert file != null;
+
+        System.out.println(file.path());
+
+        if (file.isDirectory()) {
+            for (IgfsPath child : igfs.listPaths(path))
+                dumpIgfs(igfs, child);
+        }
+        else {
+            try (BufferedReader br = new BufferedReader(new 
InputStreamReader(igfs.open(path)))) {
+                String line = br.readLine();
+
+                while (line != null) {
+                    System.out.println(line);
+
+                    line = br.readLine();
+                }
+            }
+        }
+    }
+
+    /**
+     * Check job status.
+     *
+     * @param status Job status.
+     * @param expJobId Expected job ID.
+     * @param expJobName Expected job name.
+     * @param expState Expected state.
+     * @param expCleanupProgress Expected cleanup progress.
+     * @throws Exception If failed.
+     */
+    private static void checkJobStatus(JobStatus status, JobID expJobId, 
String expJobName,
+        JobStatus.State expState, float expCleanupProgress) throws Exception {
+        assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", 
actual=" + status.getJobID();
+        assert F.eq(status.getJobName(), expJobName) : "Expected=" + 
expJobName + ", actual=" + status.getJobName();
+        assert F.eq(status.getState(), expState) : "Expected=" + expState + ", 
actual=" + status.getState();
+        assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
+            "Expected=" + expCleanupProgress + ", actual=" + 
status.getCleanupProgress();
+    }
+
+    /**
+     * @return Configuration.
+     */
+    private Configuration config(int port) {
+        Configuration conf = HadoopUtils.safeCreateConfiguration();
+
+        setupFileSystems(conf);
+
+        conf.set(MRConfig.FRAMEWORK_NAME, 
IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME);
+        conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
+
+        conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
+
+        return conf;
+    }
+
+    /**
+     * @return Protocol provider.
+     */
+    private IgniteHadoopClientProtocolProvider provider() {
+        return new IgniteHadoopClientProtocolProvider();
+    }
+
+    /**
+     * Test mapper.
+     */
+    public static class TestMapper extends Mapper<Object, Text, Text, 
IntWritable> {
+        /** Writable container for writing word. */
+        private Text word = new Text();
+
+        /** Writable integer constant of '1' is writing as count of found 
words. */
+        private static final IntWritable one = new IntWritable(1);
+
+        /** {@inheritDoc} */
+        @Override public void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
+            while (mapLockFile.exists())
+                Thread.sleep(50);
+
+            StringTokenizer wordList = new StringTokenizer(val.toString());
+
+            while (wordList.hasMoreTokens()) {
+                word.set(wordList.nextToken());
+
+                ctx.write(word, one);
+            }
+        }
+    }
+
+    /**
+     * Test Hadoop counters.
+     */
+    public enum TestCounter {
+        COUNTER1, COUNTER2, COUNTER3
+    }
+
+    /**
+     * Test mapper that uses counters.
+     */
+    public static class TestCountingMapper extends TestMapper {
+        /** {@inheritDoc} */
+        @Override public void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
+            super.map(key, val, ctx);
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+        }
+    }
+
+    /**
+     * Test combiner that counts invocations.
+     */
+    public static class TestCountingCombiner extends TestReducer {
+        @Override public void reduce(Text key, Iterable<IntWritable> values,
+            Context ctx) throws IOException, InterruptedException {
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+            ctx.getCounter(TestCounter.COUNTER2).increment(1);
+
+            int sum = 0;
+            for (IntWritable value : values)
+                sum += value.get();
+
+            ctx.write(key, new IntWritable(sum));
+        }
+    }
+
+    /**
+     * Test reducer that counts invocations.
+     */
+    public static class TestCountingReducer extends TestReducer {
+        @Override public void reduce(Text key, Iterable<IntWritable> values,
+            Context ctx) throws IOException, InterruptedException {
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+            ctx.getCounter(TestCounter.COUNTER3).increment(1);
+        }
+    }
+
+    /**
+     * Test combiner.
+     */
+    public static class TestCombiner extends Reducer<Text, IntWritable, Text, 
IntWritable> {
+        // No-op.
+    }
+
+    public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
+        /** {@inheritDoc} */
+        @Override public synchronized OutputCommitter 
getOutputCommitter(TaskAttemptContext ctx)
+            throws IOException {
+            return new TestOutputCommitter(ctx, 
(FileOutputCommitter)super.getOutputCommitter(ctx));
+        }
+    }
+
+    /**
+     * Test output committer.
+     */
+    private static class TestOutputCommitter extends FileOutputCommitter {
+        /** Delegate. */
+        private final FileOutputCommitter delegate;
+
+        /**
+         * Constructor.
+         *
+         * @param ctx Task attempt context.
+         * @param delegate Delegate.
+         * @throws IOException If failed.
+         */
+        private TestOutputCommitter(TaskAttemptContext ctx, 
FileOutputCommitter delegate) throws IOException {
+            super(FileOutputFormat.getOutputPath(ctx), ctx);
+
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setupJob(JobContext jobCtx) throws IOException {
+            try {
+                while (setupLockFile.exists())
+                    Thread.sleep(50);
+            }
+            catch (InterruptedException ignored) {
+                throw new IOException("Interrupted.");
+            }
+
+            delegate.setupJob(jobCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setupTask(TaskAttemptContext taskCtx) throws 
IOException {
+            delegate.setupTask(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) 
throws IOException {
+            return delegate.needsTaskCommit(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void commitTask(TaskAttemptContext taskCtx) throws 
IOException {
+            delegate.commitTask(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void abortTask(TaskAttemptContext taskCtx) throws 
IOException {
+            delegate.abortTask(taskCtx);
+        }
+    }
+
+    /**
+     * Test reducer.
+     */
+    public static class TestReducer extends Reducer<Text, IntWritable, Text, 
IntWritable> {
+        /** Writable container for writing sum of word counts. */
+        private IntWritable totalWordCnt = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override public void reduce(Text key, Iterable<IntWritable> values, 
Context ctx) throws IOException,
+            InterruptedException {
+            while (reduceLockFile.exists())
+                Thread.sleep(50);
+
+            int wordCnt = 0;
+
+            for (IntWritable value : values)
+                wordCnt += value.get();
+
+            totalWordCnt.set(wordCnt);
+
+            ctx.write(key, totalWordCnt);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java
new file mode 100644
index 0000000..6f910f1
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.IgniteTxConfigCacheSelfTest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Test checks whether hadoop system cache doesn't use user defined TX config.
+ */
+public class HadoopTxConfigCacheTest  extends IgniteTxConfigCacheSelfTest {
+    /**
+     * Success if system caches weren't timed out.
+     *
+     * @throws Exception
+     */
+    public void testSystemCacheTx() throws Exception {
+        final Ignite ignite = grid(0);
+
+        final IgniteInternalCache<Object, Object> hadoopCache = 
getSystemCache(ignite, CU.SYS_CACHE_HADOOP_MR);
+
+        checkImplicitTxSuccess(hadoopCache);
+        checkStartTxSuccess(hadoopCache);
+    }
+}
\ No newline at end of file

Reply via email to