http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
new file mode 100644
index 0000000..88d0f80
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * Mock job for planner tests.
+ */
+public class HadoopPlannerMockJob implements HadoopJob {
+    /** Input splits. */
+    private final Collection<HadoopInputSplit> splits;
+
+    /** Reducers count. */
+    private final int reducers;
+
+    /**
+     * Constructor.
+     *
+     * @param splits Input splits.
+     * @param reducers Reducers.
+     */
+    public HadoopPlannerMockJob(Collection<HadoopInputSplit> splits, int 
reducers) {
+        this.splits = splits;
+        this.reducers = reducers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<HadoopInputSplit> input() throws 
IgniteCheckedException {
+        return splits;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobInfo info() {
+        return new JobInfo(reducers);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId id() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) 
throws IgniteCheckedException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(boolean external, UUID nodeId) throws 
IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dispose(boolean external) throws 
IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupStagingDirectory() {
+        throwUnsupported();
+    }
+
+    /**
+     * Throw {@link UnsupportedOperationException}.
+     */
+    private static void throwUnsupported() {
+        throw new UnsupportedOperationException("Should not be called!");
+    }
+
+    /**
+     * Mocked job info.
+     */
+    private static class JobInfo implements HadoopJobInfo {
+        /** Reducers. */
+        private final int reducers;
+
+        /**
+         * Constructor.
+         *
+         * @param reducers Reducers.
+         */
+        public JobInfo(int reducers) {
+            this.reducers = reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int reducers() {
+            return reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String property(String name) {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasCombiner() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasReducer() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopJob createJob(Class<? extends HadoopJob> 
jobCls, HadoopJobId jobId, IgniteLogger log,
+            @Nullable String[] libNames) throws IgniteCheckedException {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String jobName() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String user() {
+            throwUnsupported();
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
new file mode 100644
index 0000000..3f825b0
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.MinMaxPriorityQueue.orderedBy;
+import static java.util.Collections.reverseOrder;
+
+/**
+ * Hadoop-based 10 popular words example: all files in a given directory are 
tokenized and for each word longer than
+ * 3 characters the number of occurrences ins calculated. Finally, 10 words 
with the highest occurrence count are
+ * output.
+ *
+ * NOTE: in order to run this example on Windows please ensure that cygwin is 
installed and available in the system
+ * path.
+ */
+public class HadoopPopularWordsTest {
+    /** Ignite home. */
+    private static final String IGNITE_HOME = U.getIgniteHome();
+
+    /** The path to the input directory. ALl files in that directory will be 
processed. */
+    private static final Path BOOKS_LOCAL_DIR =
+        new Path("file:" + IGNITE_HOME, 
"modules/tests/java/org/apache/ignite/grid/hadoop/books");
+
+    /** The path to the output directory. THe result file will be written to 
this location. */
+    private static final Path RESULT_LOCAL_DIR =
+        new Path("file:" + IGNITE_HOME, 
"modules/tests/java/org/apache/ignite/grid/hadoop/output");
+
+    /** Popular books source dir in DFS. */
+    private static final Path BOOKS_DFS_DIR = new 
Path("tmp/word-count-example/in");
+
+    /** Popular books source dir in DFS. */
+    private static final Path RESULT_DFS_DIR = new 
Path("tmp/word-count-example/out");
+
+    /** Path to the distributed file system configuration. */
+    private static final String DFS_CFG = 
"examples/config/filesystem/core-site.xml";
+
+    /** Top N words to select **/
+    private static final int POPULAR_WORDS_CNT = 10;
+
+    /**
+     * For each token in the input string the mapper emits a {word, 1} pair.
+     */
+    private static class TokenizingMapper extends Mapper<LongWritable, Text, 
Text, IntWritable> {
+        /** Constant value. */
+        private static final IntWritable ONE = new IntWritable(1);
+
+        /** The word converted into the Text. */
+        private Text word = new Text();
+
+        /**
+         * Emits a entry where the key is the word and the value is always 1.
+         *
+         * @param key the current position in the input file (not used here)
+         * @param val the text string
+         * @param ctx mapper context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        @Override protected void map(LongWritable key, Text val, Context ctx)
+            throws IOException, InterruptedException {
+            // Get the mapped object.
+            final String line = val.toString();
+
+            // Splits the given string to words.
+            final String[] words = line.split("[^a-zA-Z0-9]");
+
+            for (final String w : words) {
+                // Only emit counts for longer words.
+                if (w.length() <= 3)
+                    continue;
+
+                word.set(w);
+
+                // Write the word into the context with the initial count 
equals 1.
+                ctx.write(word, ONE);
+            }
+        }
+    }
+
+    /**
+     * The reducer uses a priority queue to rank the words based on its number 
of occurrences.
+     */
+    private static class TopNWordsReducer extends Reducer<Text, IntWritable, 
Text, IntWritable> {
+        private MinMaxPriorityQueue<Entry<Integer, String>> q;
+
+        TopNWordsReducer() {
+            q = orderedBy(reverseOrder(new Comparator<Entry<Integer, 
String>>() {
+                @Override public int compare(Entry<Integer, String> o1, 
Entry<Integer, String> o2) {
+                    return o1.getKey().compareTo(o2.getKey());
+                }
+            
})).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create();
+        }
+
+        /**
+         * This method doesn't emit anything, but just keeps track of the top 
N words.
+         *
+         * @param key The word.
+         * @param vals The words counts.
+         * @param ctx Reducer context.
+         * @throws IOException If failed.
+         * @throws InterruptedException If failed.
+         */
+        @Override public void reduce(Text key, Iterable<IntWritable> vals, 
Context ctx) throws IOException,
+            InterruptedException {
+            int sum = 0;
+
+            for (IntWritable val : vals)
+                sum += val.get();
+
+            q.add(immutableEntry(sum, key.toString()));
+        }
+
+        /**
+         * This method is called after all the word entries have been 
processed. It writes the accumulated
+         * statistics to the job output file.
+         *
+         * @param ctx The job context.
+         * @throws IOException If failed.
+         * @throws InterruptedException If failed.
+         */
+        @Override protected void cleanup(Context ctx) throws IOException, 
InterruptedException {
+            IntWritable i = new IntWritable();
+
+            Text txt = new Text();
+
+            // iterate in desc order
+            while (!q.isEmpty()) {
+                Entry<Integer, String> e = q.removeFirst();
+
+                i.set(e.getKey());
+
+                txt.set(e.getValue());
+
+                ctx.write(txt, i);
+            }
+        }
+    }
+
+    /**
+     * Configures the Hadoop MapReduce job.
+     *
+     * @return Instance of the Hadoop MapRed job.
+     * @throws IOException If failed.
+     */
+    @SuppressWarnings("deprecation")
+    private Job createConfigBasedHadoopJob() throws IOException {
+        Job jobCfg = new Job();
+
+        Configuration cfg = jobCfg.getConfiguration();
+
+        // Use explicit configuration of distributed file system, if provided.
+        cfg.addResource(U.resolveIgniteUrl(DFS_CFG));
+
+        jobCfg.setJobName("HadoopPopularWordExample");
+        jobCfg.setJarByClass(HadoopPopularWordsTest.class);
+        jobCfg.setInputFormatClass(TextInputFormat.class);
+        jobCfg.setOutputKeyClass(Text.class);
+        jobCfg.setOutputValueClass(IntWritable.class);
+        jobCfg.setMapperClass(TokenizingMapper.class);
+        jobCfg.setReducerClass(TopNWordsReducer.class);
+
+        FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
+        FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);
+
+        // Local job tracker allows the only task per wave, but text input 
format
+        // replaces it with the calculated value based on input split size 
option.
+        if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
+            // Split job into tasks using 32MB split size.
+            FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024);
+            FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
+        }
+
+        return jobCfg;
+    }
+
+    /**
+     * Runs the Hadoop job.
+     *
+     * @return {@code True} if succeeded, {@code false} otherwise.
+     * @throws Exception If failed.
+     */
+    private boolean runWordCountConfigBasedHadoopJob() throws Exception {
+        Job job = createConfigBasedHadoopJob();
+
+        // Distributed file system this job will work with.
+        FileSystem fs = FileSystem.get(job.getConfiguration());
+
+        X.println(">>> Using distributed file system: " + 
fs.getHomeDirectory());
+
+        // Prepare input and output job directories.
+        prepareDirectories(fs);
+
+        long time = System.currentTimeMillis();
+
+        // Run job.
+        boolean res = job.waitForCompletion(true);
+
+        X.println(">>> Job execution time: " + (System.currentTimeMillis() - 
time) / 1000 + " sec.");
+
+        // Move job results into local file system, so you can view calculated 
results.
+        publishResults(fs);
+
+        return res;
+    }
+
+    /**
+     * Prepare job's data: cleanup result directories that might have left over
+     * after previous runs, copy input files from the local file system into 
DFS.
+     *
+     * @param fs Distributed file system to use in job.
+     * @throws IOException If failed.
+     */
+    private void prepareDirectories(FileSystem fs) throws IOException {
+        X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR);
+
+        fs.delete(RESULT_DFS_DIR, true);
+
+        X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+        fs.delete(BOOKS_DFS_DIR, true);
+
+        X.println(">>> Copy local files into DFS input directory: " + 
BOOKS_DFS_DIR);
+
+        fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR);
+    }
+
+    /**
+     * Publish job execution results into local file system, so you can view 
them.
+     *
+     * @param fs Distributed file sytem used in job.
+     * @throws IOException If failed.
+     */
+    private void publishResults(FileSystem fs) throws IOException {
+        X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+        fs.delete(BOOKS_DFS_DIR, true);
+
+        X.println(">>> Cleaning up LOCAL result directory: " + 
RESULT_LOCAL_DIR);
+
+        fs.delete(RESULT_LOCAL_DIR, true);
+
+        X.println(">>> Moving job results into LOCAL result directory: " + 
RESULT_LOCAL_DIR);
+
+        fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR);
+    }
+
+    /**
+     * Executes a modified version of the Hadoop word count example. Here, in 
addition to counting the number of
+     * occurrences of the word in the source files, the N most popular words 
are selected.
+     *
+     * @param args None.
+     */
+    public static void main(String[] args) {
+        try {
+            new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob();
+        }
+        catch (Exception e) {
+            X.println(">>> Failed to run word count example: " + 
e.getMessage());
+        }
+
+        System.exit(0);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
new file mode 100644
index 0000000..789a6b3
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import 
org.apache.ignite.internal.processors.hadoop.v2.HadoopSerializationWrapper;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test of wrapper of the native serialization.
+ */
+public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest 
{
+    /**
+     * Tests read/write of IntWritable via native WritableSerialization.
+     * @throws Exception If fails.
+     */
+    public void testIntWritableSerialization() throws Exception {
+        HadoopSerialization ser = new HadoopSerializationWrapper(new 
WritableSerialization(), IntWritable.class);
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        DataOutput out = new DataOutputStream(buf);
+
+        ser.write(out, new IntWritable(3));
+        ser.write(out, new IntWritable(-5));
+
+        assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", 
Arrays.toString(buf.toByteArray()));
+
+        DataInput in = new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
+
+        assertEquals(3, ((IntWritable)ser.read(in, null)).get());
+        assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
+    }
+
+    /**
+     * Tests read/write of Integer via native JavaleSerialization.
+     * @throws Exception If fails.
+     */
+    public void testIntJavaSerialization() throws Exception {
+        HadoopSerialization ser = new HadoopSerializationWrapper(new 
JavaSerialization(), Integer.class);
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        DataOutput out = new DataOutputStream(buf);
+
+        ser.write(out, 3);
+        ser.write(out, -5);
+        ser.close();
+
+        DataInput in = new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
+
+        assertEquals(3, ((Integer)ser.read(in, null)).intValue());
+        assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
new file mode 100644
index 0000000..7552028
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.concurrent.ConcurrentMap;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * For tests.
+ */
+public class HadoopSharedMap {
+    /** */
+    private static final ConcurrentMap<String, HadoopSharedMap> maps = new 
ConcurrentHashMap8<>();
+
+    /** */
+    private final ConcurrentMap<String, Object> map = new 
ConcurrentHashMap8<>();
+
+    /**
+     * Private.
+     */
+    private HadoopSharedMap() {
+        // No-op.
+    }
+
+    /**
+     * Puts object by key.
+     *
+     * @param key Key.
+     * @param val Value.
+     */
+    public <T> T put(String key, T val) {
+        Object old = map.putIfAbsent(key, val);
+
+        return old == null ? val : (T)old;
+    }
+
+    /**
+     * @param cls Class.
+     * @return Map of static fields.
+     */
+    public static HadoopSharedMap map(Class<?> cls) {
+        HadoopSharedMap m = maps.get(cls.getName());
+
+        if (m != null)
+            return m;
+
+        HadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new 
HadoopSharedMap());
+
+        return old == null ? m : old;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
new file mode 100644
index 0000000..27a5fcd
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Same test as HadoopMapReduceTest, but with enabled Snappy output 
compression.
+ */
+public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override protected boolean compressOutputSnappy() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean[][] getApiModes() {
+        return new boolean[][] {
+            { false, false, true },
+            { true, true, true },
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
new file mode 100644
index 0000000..b4e3dc2
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests isolated Hadoop Snappy codec usage.
+ */
+public class HadoopSnappyTest extends GridCommonAbstractTest {
+    /** Length of data. */
+    private static final int BYTE_SIZE = 1024 * 50;
+
+    /**
+     * Checks Snappy codec usage.
+     *
+     * @throws Exception On error.
+     */
+    public void testSnappy() throws Throwable {
+        // Run Snappy test in default class loader:
+        checkSnappy();
+
+        // Run the same in several more class loaders simulating jobs and 
tasks:
+        for (int i = 0; i < 2; i++) {
+            ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, 
null);
+
+            Class<?> cls = 
(Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
+
+            assertEquals(hadoopClsLdr, cls.getClassLoader());
+
+            U.invoke(cls, null, "checkSnappy");
+        }
+    }
+
+    /**
+     * Internal check routine.
+     *
+     * @throws Throwable If failed.
+     */
+    public static void checkSnappy() throws Throwable {
+        try {
+            byte[] expBytes = new byte[BYTE_SIZE];
+            byte[] actualBytes = new byte[BYTE_SIZE];
+
+            for (int i = 0; i < expBytes.length ; i++)
+                expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
+
+            SnappyCodec codec = new SnappyCodec();
+
+            codec.setConf(new Configuration());
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            try (CompressionOutputStream cos = codec.createOutputStream(baos)) 
{
+                cos.write(expBytes);
+                cos.flush();
+            }
+
+            try (CompressionInputStream cis = codec.createInputStream(new 
ByteArrayInputStream(baos.toByteArray()))) {
+                int read = cis.read(actualBytes, 0, actualBytes.length);
+
+                assert read == actualBytes.length;
+            }
+
+            assert Arrays.equals(expBytes, actualBytes);
+        }
+        catch (Throwable e) {
+            System.out.println("Snappy check failed:");
+            System.out.println("### NativeCodeLoader.isNativeCodeLoaded:  " + 
NativeCodeLoader.isNativeCodeLoaded());
+            System.out.println("### SnappyCompressor.isNativeCodeLoaded:  " + 
SnappyCompressor.isNativeCodeLoaded());
+
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
new file mode 100644
index 0000000..dff5e70
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+
+/**
+ * External test for sorting.
+ */
+public class HadoopSortingExternalTest extends HadoopSortingTest {
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new JdkMarshaller());
+
+        return cfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
new file mode 100644
index 0000000..20f5eef
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+import java.util.UUID;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.util.typedef.X;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Tests correct sorting.
+ */
+public class HadoopSortingTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_INPUT = "/test-in";
+
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * @return {@code True} if IGFS is enabled on Hadoop nodes.
+     */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSortSimple() throws Exception {
+        // Generate test data.
+        Job job = Job.getInstance();
+
+        job.setInputFormatClass(InFormat.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(NullWritable.class);
+
+        job.setMapperClass(Mapper.class);
+        job.setNumReduceTasks(0);
+
+        setupFileSystems(job.getConfiguration());
+
+        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + 
PATH_INPUT));
+
+        X.printerrln("Data generation started.");
+
+        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+            createJobInfo(job.getConfiguration())).get(180000);
+
+        X.printerrln("Data generation complete.");
+
+        // Run main map-reduce job.
+        job = Job.getInstance();
+
+        setupFileSystems(job.getConfiguration());
+
+        
job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, 
JavaSerialization.class.getName() +
+            "," + WritableSerialization.class.getName());
+
+        FileInputFormat.setInputPaths(job, new Path(igfsScheme() + 
PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + 
PATH_OUTPUT));
+
+        job.setSortComparatorClass(JavaSerializationComparator.class);
+
+        job.setMapperClass(MyMapper.class);
+        job.setReducerClass(MyReducer.class);
+
+        job.setNumReduceTasks(2);
+
+        job.setMapOutputKeyClass(UUID.class);
+        job.setMapOutputValueClass(NullWritable.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(NullWritable.class);
+
+        X.printerrln("Job started.");
+
+        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
+            createJobInfo(job.getConfiguration())).get(180000);
+
+        X.printerrln("Job complete.");
+
+        // Check result.
+        Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
+
+        AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), 
job.getConfiguration());
+
+        for (FileStatus file : fs.listStatus(outDir)) {
+            X.printerrln("__ file: " + file);
+
+            if (file.getLen() == 0)
+                continue;
+
+            FSDataInputStream in = fs.open(file.getPath());
+
+            Scanner sc = new Scanner(in);
+
+            UUID prev = null;
+
+            while(sc.hasNextLine()) {
+                UUID next = UUID.fromString(sc.nextLine());
+
+//                X.printerrln("___ check: " + next);
+
+                if (prev != null)
+                    assertTrue(prev.compareTo(next) < 0);
+
+                prev = next;
+            }
+        }
+    }
+
+    public static class InFormat extends InputFormat<Text, NullWritable> {
+        /** {@inheritDoc} */
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws 
IOException, InterruptedException {
+            List<InputSplit> res = new ArrayList<>();
+
+            FakeSplit split = new FakeSplit(20);
+
+            for (int i = 0; i < 10; i++)
+                res.add(split);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordReader<Text, NullWritable> 
createRecordReader(final InputSplit split,
+            TaskAttemptContext ctx) throws IOException, InterruptedException {
+            return new RecordReader<Text, NullWritable>() {
+                /** */
+                int cnt;
+
+                /** */
+                Text txt = new Text();
+
+                @Override public void initialize(InputSplit split, 
TaskAttemptContext ctx) {
+                    // No-op.
+                }
+
+                @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
+                    return ++cnt <= split.getLength();
+                }
+
+                @Override public Text getCurrentKey() {
+                    txt.set(UUID.randomUUID().toString());
+
+//                    X.printerrln("___ read: " + txt);
+
+                    return txt;
+                }
+
+                @Override public NullWritable getCurrentValue() {
+                    return NullWritable.get();
+                }
+
+                @Override public float getProgress() throws IOException, 
InterruptedException {
+                    return (float)cnt / split.getLength();
+                }
+
+                @Override public void close() {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    public static class MyMapper extends Mapper<LongWritable, Text, UUID, 
NullWritable> {
+        /** {@inheritDoc} */
+        @Override protected void map(LongWritable key, Text val, Context ctx) 
throws IOException, InterruptedException {
+//            X.printerrln("___ map: " + val);
+
+            ctx.write(UUID.fromString(val.toString()), NullWritable.get());
+        }
+    }
+
+    public static class MyReducer extends Reducer<UUID, NullWritable, Text, 
NullWritable> {
+        /** */
+        private Text text = new Text();
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(UUID key, Iterable<NullWritable> vals, 
Context ctx)
+            throws IOException, InterruptedException {
+//            X.printerrln("___ rdc: " + key);
+
+            text.set(key.toString());
+
+            ctx.write(text, NullWritable.get());
+        }
+    }
+
+    public static class FakeSplit extends InputSplit implements Writable {
+        /** */
+        private static final String[] HOSTS = {"127.0.0.1"};
+
+        /** */
+        private int len;
+
+        /**
+         * @param len Length.
+         */
+        public FakeSplit(int len) {
+            this.len = len;
+        }
+
+        /**
+         *
+         */
+        public FakeSplit() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getLength() throws IOException, 
InterruptedException {
+            return len;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String[] getLocations() throws IOException, 
InterruptedException {
+            return HOSTS;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(DataOutput out) throws IOException {
+            out.writeInt(len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFields(DataInput in) throws IOException {
+            len = in.readInt();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
new file mode 100644
index 0000000..11c3907
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Self test of {@link 
org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}.
+ */
+public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest {
+    /**
+     * Tests serialization of wrapper and the wrapped native split.
+     * @throws Exception If fails.
+     */
+    public void testSerialization() throws Exception {
+        FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 
500, new String[]{"host1", "host2"});
+
+        assertEquals("/path/to/file:100+500", nativeSplit.toString());
+
+        HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, 
nativeSplit.getLocations());
+
+        assertEquals("[host1, host2]", Arrays.toString(split.hosts()));
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        ObjectOutput out = new ObjectOutputStream(buf);
+
+        out.writeObject(split);
+
+        ObjectInput in = new ObjectInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
+
+        final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject();
+
+        assertEquals("/path/to/file:100+500", 
HadoopUtils.unwrapSplit(res).toString());
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                res.hosts();
+
+                return null;
+            }
+        }, AssertionError.class, null);
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
new file mode 100644
index 0000000..820a1f3
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.G;
+
+/**
+ * Hadoop node startup.
+ */
+public class HadoopStartup {
+    /**
+     * @param args Arguments.
+     */
+    public static void main(String[] args) {
+        G.start("config/hadoop/default-config.xml");
+    }
+
+    /**
+     * @return Configuration for job run.
+     */
+    @SuppressWarnings("UnnecessaryFullyQualifiedName")
+    public static Configuration configuration() {
+        Configuration cfg = new Configuration();
+
+        cfg.set("fs.defaultFS", "igfs://igfs@localhost");
+
+        cfg.set("fs.igfs.impl", 
org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
+        cfg.set("fs.AbstractFileSystem.igfs.impl", 
IgniteHadoopFileSystem.class.getName());
+
+        cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", 
"NEVER");
+
+        cfg.set("mapreduce.framework.name", "ignite");
+        cfg.set("mapreduce.jobtracker.address", "localhost:11211");
+
+        return cfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
new file mode 100644
index 0000000..431433e
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Tests map-reduce task execution basics.
+ */
+public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static HadoopSharedMap m = 
HadoopSharedMap.map(HadoopTaskExecutionSelfTest.class);
+
+    /** Line count. */
+    private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", 
new AtomicInteger());
+
+    /** Executed tasks. */
+    private static final AtomicInteger executedTasks = m.put("executedTasks", 
new AtomicInteger());
+
+    /** Cancelled tasks. */
+    private static final AtomicInteger cancelledTasks = 
m.put("cancelledTasks", new AtomicInteger());
+
+    /** Working directory of each task. */
+    private static final Map<String, String> taskWorkDirs = 
m.put("taskWorkDirs",
+        new ConcurrentHashMap<String, String>());
+
+    /** Mapper id to fail. */
+    private static final AtomicInteger failMapperId = m.put("failMapperId", 
new AtomicInteger());
+
+    /** Number of splits of the current input. */
+    private static final AtomicInteger splitsCount = m.put("splitsCount", new 
AtomicInteger());
+
+    /** Test param. */
+    private static final String MAP_WRITE = "test.map.write";
+
+
+    /** {@inheritDoc} */
+    @Override public FileSystemConfiguration igfsConfiguration() throws 
Exception {
+        FileSystemConfiguration cfg = super.igfsConfiguration();
+
+        cfg.setFragmentizerEnabled(false);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        grid(0).fileSystem(igfsName).format();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setMaxParallelTasks(5);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapRun() throws Exception {
+        int lineCnt = 10000;
+        String fileName = "/testFile";
+
+        prepareFile(fileName, lineCnt);
+
+        totalLineCnt.set(0);
+        taskWorkDirs.clear();
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new 
HadoopJobId(UUID.randomUUID(), 1),
+                createJobInfo(job.getConfiguration()));
+
+        fut.get();
+
+        assertEquals(lineCnt, totalLineCnt.get());
+
+        assertEquals(32, taskWorkDirs.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapCombineRun() throws Exception {
+        int lineCnt = 10001;
+        String fileName = "/testFile";
+
+        prepareFile(fileName, lineCnt);
+
+        totalLineCnt.set(0);
+        taskWorkDirs.clear();
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+        cfg.setBoolean(MAP_WRITE, true);
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+        job.setCombinerClass(TestCombiner.class);
+        job.setReducerClass(TestReducer.class);
+
+        job.setNumReduceTasks(2);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output"));
+
+        job.setJarByClass(getClass());
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, 
createJobInfo(job.getConfiguration()));
+
+        fut.get();
+
+        assertEquals(lineCnt, totalLineCnt.get());
+
+        assertEquals(34, taskWorkDirs.size());
+
+        for (int g = 0; g < gridCount(); g++)
+            grid(g).hadoop().finishFuture(jobId).get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapperException() throws Exception {
+        prepareFile("/testFile", 1000);
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(FailMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new 
HadoopJobId(UUID.randomUUID(), 3),
+                createJobInfo(job.getConfiguration()));
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+    }
+
+    /**
+     * @param fileName File name.
+     * @param lineCnt Line count.
+     * @throws Exception If failed.
+     */
+    private void prepareFile(String fileName, int lineCnt) throws Exception {
+        IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
+
+        try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) {
+            PrintWriter w = new PrintWriter(new OutputStreamWriter(os));
+
+            for (int i = 0; i < lineCnt; i++)
+                w.print("Hello, Hadoop map-reduce!\n");
+
+            w.flush();
+        }
+    }
+
+    /**
+     * Prepare job with mappers to cancel.
+     * @return Fully configured job.
+     * @throws Exception If fails.
+     */
+    private Configuration prepareJobForCancelling() throws Exception {
+        prepareFile("/testFile", 1500);
+
+        executedTasks.set(0);
+        cancelledTasks.set(0);
+        failMapperId.set(0);
+        splitsCount.set(0);
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(CancellingTestMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(InFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + 
getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + 
getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        return job.getConfiguration();
+    }
+
+    /**
+     * Test input format.
+     */
+    private static class InFormat extends TextInputFormat {
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws 
IOException {
+            List<InputSplit> res = super.getSplits(ctx);
+
+            splitsCount.set(res.size());
+
+            X.println("___ split of input: " + splitsCount.get());
+
+            return res;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTaskCancelling() throws Exception {
+        Configuration cfg = prepareJobForCancelling();
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, 
createJobInfo(cfg));
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return splitsCount.get() > 0;
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return executedTasks.get() == splitsCount.get();
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        // Fail mapper with id "1", cancels others
+        failMapperId.set(1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        assertEquals(executedTasks.get(), cancelledTasks.get() + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJobKill() throws Exception {
+        Configuration cfg = prepareJobForCancelling();
+
+        Hadoop hadoop = grid(0).hadoop();
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+        //Kill unknown job.
+        boolean killRes = hadoop.kill(jobId);
+
+        assertFalse(killRes);
+
+        final IgniteInternalFuture<?> fut = hadoop.submit(jobId, 
createJobInfo(cfg));
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return splitsCount.get() > 0;
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                X.println("___ executed tasks: " + executedTasks.get());
+
+                return executedTasks.get() == splitsCount.get();
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            fail();
+        }
+
+        //Kill really ran job.
+        killRes = hadoop.kill(jobId);
+
+        assertTrue(killRes);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        assertEquals(executedTasks.get(), cancelledTasks.get());
+
+        //Kill the same job again.
+        killRes = hadoop.kill(jobId);
+
+        assertFalse(killRes);
+    }
+
+    private static class CancellingTestMapper extends Mapper<Object, Text, 
Text, IntWritable> {
+        private int mapperId;
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, 
InterruptedException {
+            mapperId = executedTasks.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(Context ctx) throws IOException, 
InterruptedException {
+            try {
+                super.run(ctx);
+            }
+            catch (HadoopTaskCancelledException e) {
+                cancelledTasks.incrementAndGet();
+
+                throw e;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
+            if (mapperId == failMapperId.get())
+                throw new IOException();
+
+            Thread.sleep(1000);
+        }
+    }
+
+    /**
+     * Test failing mapper.
+     */
+    private static class FailMapper extends Mapper<Object, Text, Text, 
IntWritable> {
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
+            throw new IOException("Expected");
+        }
+    }
+
+    /**
+     * Mapper calculates number of lines.
+     */
+    private static class TestMapper extends Mapper<Object, Text, Text, 
IntWritable> {
+        /** Writable integer constant of '1'. */
+        private static final IntWritable ONE = new IntWritable(1);
+
+        /** Line count constant. */
+        public static final Text LINE_COUNT = new Text("lineCount");
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, 
InterruptedException {
+            X.println("___ Mapper: " + ctx.getTaskAttemptID());
+
+            String taskId = ctx.getTaskAttemptID().toString();
+
+            LocalFileSystem locFs = 
FileSystem.getLocal(ctx.getConfiguration());
+
+            String workDir = locFs.getWorkingDirectory().toString();
+
+            assertNull(taskWorkDirs.put(workDir, taskId));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
+            if (ctx.getConfiguration().getBoolean(MAP_WRITE, false))
+                ctx.write(LINE_COUNT, ONE);
+            else
+                totalLineCnt.incrementAndGet();
+        }
+    }
+
+    /**
+     * Combiner calculates number of lines.
+     */
+    private static class TestCombiner extends Reducer<Text, IntWritable, Text, 
IntWritable> {
+        /** */
+        IntWritable sum = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, 
InterruptedException {
+            X.println("___ Combiner: ");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(Text key, Iterable<IntWritable> 
values, Context ctx) throws IOException,
+            InterruptedException {
+            int lineCnt = 0;
+
+            for (IntWritable value : values)
+                lineCnt += value.get();
+
+            sum.set(lineCnt);
+
+            X.println("___ combo: " + lineCnt);
+
+            ctx.write(key, sum);
+        }
+    }
+
+    /**
+     * Combiner calculates number of lines.
+     */
+    private static class TestReducer extends Reducer<Text, IntWritable, Text, 
IntWritable> {
+        /** */
+        IntWritable sum = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, 
InterruptedException {
+            X.println("___ Reducer: " + ctx.getTaskAttemptID());
+
+            String taskId = ctx.getTaskAttemptID().toString();
+            String workDir = 
FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString();
+
+            assertNull(taskWorkDirs.put(workDir, taskId));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(Text key, Iterable<IntWritable> 
values, Context ctx) throws IOException,
+            InterruptedException {
+            int lineCnt = 0;
+
+            for (IntWritable value : values) {
+                lineCnt += value.get();
+
+                X.println("___ rdcr: " + value.get());
+            }
+
+            sum.set(lineCnt);
+
+            ctx.write(key, sum);
+
+            X.println("___ RDCR SUM: " + lineCnt);
+
+            totalLineCnt.addAndGet(lineCnt);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
new file mode 100644
index 0000000..7c6d244
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+
+/**
+ * Tests of Map, Combine and Reduce task executions of any version of hadoop 
API.
+ */
+abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
+    /** Empty hosts array. */
+    private static final String[] HOSTS = new String[0];
+
+    /**
+     * Creates some grid hadoop job. Override this method to create tests for 
any job implementation.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws IOException If fails.
+     */
+    public abstract HadoopJob getHadoopJob(String inFile, String outFile) 
throws Exception;
+
+    /**
+     * @return prefix of reducer output file name. It's "part-" for v1 and 
"part-r-" for v2 API
+     */
+    public abstract String getOutputFileNamePrefix();
+
+    /**
+     * Tests map task execution.
+     *
+     * @throws Exception If fails.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void testMapTask() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, 
HadoopWordCount2.class.getSimpleName() + "-input");
+
+        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+        try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) {
+            pw.println("hello0 world0");
+            pw.println("world1 hello1");
+        }
+
+        HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, 
igfs.info(inFile).length() - 1);
+
+        try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) {
+            pw.println("hello2 world2");
+            pw.println("world3 hello3");
+        }
+        HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, 
fileBlock1.length(),
+                igfs.info(inFile).length() - fileBlock1.length());
+
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), 
igfsScheme() + PATH_OUTPUT);
+
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, 
gridJob.id(), 0, 0, fileBlock1);
+
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, 
gridJob);
+
+        ctx.mockOutput().clear();
+
+        ctx.run();
+
+        assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; 
").join(ctx.mockOutput()));
+
+        ctx.mockOutput().clear();
+
+        ctx.taskInfo(new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 
0, fileBlock2));
+
+        ctx.run();
+
+        assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; 
").join(ctx.mockOutput()));
+    }
+
+    /**
+     * Generates input data for reduce-like operation into mock context input 
and runs the operation.
+     *
+     * @param gridJob Job is to create reduce task from.
+     * @param taskType Type of task - combine or reduce.
+     * @param taskNum Number of task in job.
+     * @param words Pairs of words and its counts.
+     * @return Context with mock output.
+     * @throws IgniteCheckedException If fails.
+     */
+    private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, 
HadoopTaskType taskType,
+        int taskNum, String... words) throws IgniteCheckedException {
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), 
taskNum, 0, null);
+
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, 
gridJob);
+
+        for (int i = 0; i < words.length; i+=2) {
+            List<IntWritable> valList = new ArrayList<>();
+
+            for (int j = 0; j < Integer.parseInt(words[i + 1]); j++)
+                valList.add(new IntWritable(1));
+
+            ctx.mockInput().put(new Text(words[i]), valList);
+        }
+
+        ctx.run();
+
+        return ctx;
+    }
+
+    /**
+     * Tests reduce task execution.
+     *
+     * @throws Exception If fails.
+     */
+    public void testReduceTask() throws Exception {
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, 
igfsScheme() + PATH_OUTPUT);
+
+        runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", 
"word2", "10");
+        runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", 
"word4", "15");
+
+        assertEquals(
+            "word1\t5\n" +
+            "word2\t10\n",
+            readAndSortFile(PATH_OUTPUT + 
"/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" +
+                getOutputFileNamePrefix() + "00000")
+        );
+
+        assertEquals(
+            "word3\t7\n" +
+            "word4\t15\n",
+            readAndSortFile(PATH_OUTPUT + 
"/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" +
+                getOutputFileNamePrefix() + "00001")
+        );
+    }
+
+    /**
+     * Tests combine task execution.
+     *
+     * @throws Exception If fails.
+     */
+    public void testCombinerTask() throws Exception {
+        HadoopJob gridJob = getHadoopJob("/", "/");
+
+        HadoopTestTaskContext ctx =
+            runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", 
"word2", "10");
+
+        assertEquals("word1,5; word2,10", Joiner.on("; 
").join(ctx.mockOutput()));
+
+        ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 1, "word3", 
"7", "word4", "15");
+
+        assertEquals("word3,7; word4,15", Joiner.on("; 
").join(ctx.mockOutput()));
+    }
+
+    /**
+     * Runs chain of map-combine task on file block.
+     *
+     * @param fileBlock block of input file to be processed.
+     * @param gridJob Hadoop job implementation.
+     * @return Context of combine task with mock output.
+     * @throws IgniteCheckedException If fails.
+     */
+    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, 
HadoopJob gridJob)
+        throws IgniteCheckedException {
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, 
gridJob.id(), 0, 0, fileBlock);
+
+        HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, 
gridJob);
+
+        mapCtx.run();
+
+        //Prepare input for combine
+        taskInfo = new HadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 
0, null);
+
+        HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, 
gridJob);
+
+        combineCtx.makeTreeOfWritables(mapCtx.mockOutput());
+
+        combineCtx.run();
+
+        return combineCtx;
+    }
+
+    /**
+     * Tests all job in complex.
+     * Runs 2 chains of map-combine tasks and sends result into one reduce 
task.
+     *
+     * @throws Exception If fails.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void testAllTasks() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, 
HadoopWordCount2.class.getSimpleName() + "-input");
+
+        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+        generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 
150, "yellow", 70);
+
+        //Split file into two blocks
+        long fileLen = igfs.info(inFile).length();
+
+        Long l = fileLen / 2;
+
+        HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, 
l);
+        HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, 
fileLen - l);
+
+        HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + 
PATH_OUTPUT);
+
+        HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, 
gridJob);
+
+        HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, 
gridJob);
+
+        //Prepare input for combine
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, 
gridJob.id(), 0, 0, null);
+
+        HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, 
gridJob);
+
+        reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput());
+        reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());
+
+        reduceCtx.run();
+
+        reduceCtx.taskInfo(new HadoopTaskInfo(HadoopTaskType.COMMIT, 
gridJob.id(), 0, 0, null));
+
+        reduceCtx.run();
+
+        assertEquals(
+            "blue\t200\n" +
+            "green\t150\n" +
+            "red\t100\n" +
+            "yellow\t70\n",
+            readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + 
"00000")
+        );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
new file mode 100644
index 0000000..27d7fc2
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of 
hadoop API v1.
+ */
+public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
+    /**
+     * Creates WordCount hadoop job for API v1.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws IOException If fails.
+     */
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) 
throws Exception {
+        JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
+
+        setupFileSystems(jobConf);
+
+        HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+
+        UUID uuid = new UUID(0, 0);
+
+        HadoopJobId jobId = new HadoopJobId(uuid, 0);
+
+        return jobInfo.createJob(HadoopV2Job.class, jobId, log, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getOutputFileNamePrefix() {
+        return "part-";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
new file mode 100644
index 0000000..30cf50c
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
+
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of 
hadoop API v2.
+ */
+public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
+    /**
+     * Creates WordCount hadoop job for API v2.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws Exception if fails.
+     */
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) 
throws Exception {
+        Job job = Job.getInstance();
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        HadoopWordCount2.setTasksClasses(job, true, true, true, false);
+
+        Configuration conf = job.getConfiguration();
+
+        setupFileSystems(conf);
+
+        FileInputFormat.setInputPaths(job, new Path(inFile));
+        FileOutputFormat.setOutputPath(job, new Path(outFile));
+
+        job.setJarByClass(HadoopWordCount2.class);
+
+        Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
+
+        HadoopDefaultJobInfo jobInfo = 
createJobInfo(hadoopJob.getConfiguration());
+
+        UUID uuid = new UUID(0, 0);
+
+        HadoopJobId jobId = new HadoopJobId(uuid, 0);
+
+        return jobInfo.createJob(HadoopV2Job.class, jobId, log, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getOutputFileNamePrefix() {
+        return "part-r-";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java
 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java
new file mode 100644
index 0000000..edafecd
--- /dev/null
+++ 
b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import 
org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Round-robin mr planner.
+ */
+public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner {
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, 
Collection<ClusterNode> top,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        if (top.isEmpty())
+            throw new IllegalArgumentException("Topology is empty");
+
+        // Has at least one element.
+        Iterator<ClusterNode> it = top.iterator();
+
+        Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
+
+        for (HadoopInputSplit block : job.input()) {
+            ClusterNode node = it.next();
+
+            Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id());
+
+            if (nodeBlocks == null) {
+                nodeBlocks = new ArrayList<>();
+
+                mappers.put(node.id(), nodeBlocks);
+            }
+
+            nodeBlocks.add(block);
+
+            if (!it.hasNext())
+                it = top.iterator();
+        }
+
+        int[] rdc = new int[job.info().reducers()];
+
+        for (int i = 0; i < rdc.length; i++)
+            rdc[i] = i;
+
+        return new HadoopDefaultMapReducePlan(mappers, 
Collections.singletonMap(it.next().id(), rdc));
+    }
+}
\ No newline at end of file

Reply via email to