http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
deleted file mode 100644
index 37f81a6..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptContextImpl;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop output collector.
- */
-public class HadoopV1OutputCollector implements OutputCollector {
-    /** Job configuration. */
-    private final JobConf jobConf;
-
-    /** Task context. */
-    private final HadoopTaskContext taskCtx;
-
-    /** Optional direct writer. */
-    private final RecordWriter writer;
-
-    /** Task attempt. */
-    private final TaskAttemptID attempt;
-
-    /**
-     * @param jobConf Job configuration.
-     * @param taskCtx Task context.
-     * @param directWrite Direct write flag.
-     * @param fileName File name.
-     * @throws IOException In case of IO exception.
-     */
-    HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, 
boolean directWrite,
-        @Nullable String fileName, TaskAttemptID attempt) throws IOException {
-        this.jobConf = jobConf;
-        this.taskCtx = taskCtx;
-        this.attempt = attempt;
-
-        if (directWrite) {
-            jobConf.set("mapreduce.task.attempt.id", attempt.toString());
-
-            OutputFormat outFormat = jobConf.getOutputFormat();
-
-            writer = outFormat.getRecordWriter(null, jobConf, fileName, 
Reporter.NULL);
-        }
-        else
-            writer = null;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void collect(Object key, Object val) throws IOException {
-        if (writer != null)
-            writer.write(key, val);
-        else {
-            try {
-                taskCtx.output().write(key, val);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    /**
-     * Close writer.
-     *
-     * @throws IOException In case of IO exception.
-     */
-    public void closeWriter() throws IOException {
-        if (writer != null)
-            writer.close(Reporter.NULL);
-    }
-
-    /**
-     * Setup task.
-     *
-     * @throws IOException If failed.
-     */
-    public void setup() throws IOException {
-        if (writer != null)
-            jobConf.getOutputCommitter().setupTask(new 
TaskAttemptContextImpl(jobConf, attempt));
-    }
-
-    /**
-     * Commit task.
-     *
-     * @throws IOException In failed.
-     */
-    public void commit() throws IOException {
-        if (writer != null) {
-            OutputCommitter outputCommitter = jobConf.getOutputCommitter();
-
-            TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, 
attempt);
-
-            if (outputCommitter.needsTaskCommit(taskCtx))
-                outputCommitter.commitTask(taskCtx);
-        }
-    }
-
-    /**
-     * Abort task.
-     */
-    public void abort() {
-        try {
-            if (writer != null)
-                jobConf.getOutputCommitter().abortTask(new 
TaskAttemptContextImpl(jobConf, attempt));
-        }
-        catch (IOException ignore) {
-            // No-op.
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
deleted file mode 100644
index 0ab1bba..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-
-/**
- * Hadoop partitioner adapter for v1 API.
- */
-public class HadoopV1Partitioner implements HadoopPartitioner {
-    /** Partitioner instance. */
-    private Partitioner<Object, Object> part;
-
-    /**
-     * @param cls Hadoop partitioner class.
-     * @param conf Job configuration.
-     */
-    public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration 
conf) {
-        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, 
conf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition(Object key, Object val, int parts) {
-        return part.getPartition(key, val, parts);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
deleted file mode 100644
index e656695..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-
-/**
- * Hadoop reduce task implementation for v1 API.
- */
-public class HadoopV1ReduceTask extends HadoopV1Task {
-    /** {@code True} if reduce, {@code false} if combine. */
-    private final boolean reduce;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     * @param reduce {@code True} if reduce, {@code false} if combine.
-     */
-    public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
-        super(taskInfo);
-
-        this.reduce = reduce;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        HadoopJob job = taskCtx.job();
-
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
-
-        JobConf jobConf = ctx.jobConf();
-
-        HadoopTaskInput input = taskCtx.input();
-
-        HadoopV1OutputCollector collector = null;
-
-        try {
-            collector = collector(jobConf, ctx, reduce || 
!job.info().hasReducer(), fileName(), ctx.attemptId());
-
-            Reducer reducer;
-            if (reduce) reducer = 
ReflectionUtils.newInstance(jobConf.getReducerClass(),
-                jobConf);
-            else reducer = 
ReflectionUtils.newInstance(jobConf.getCombinerClass(),
-                jobConf);
-
-            assert reducer != null;
-
-            try {
-                try {
-                    while (input.next()) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Reduce 
task cancelled.");
-
-                        reducer.reduce(input.key(), input.values(), collector, 
Reporter.NULL);
-                    }
-                }
-                finally {
-                    reducer.close();
-                }
-            }
-            finally {
-                collector.closeWriter();
-            }
-
-            collector.commit();
-        }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
deleted file mode 100644
index 5a63aab..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-
-/**
- * Hadoop reporter implementation for v1 API.
- */
-public class HadoopV1Reporter implements Reporter {
-    /** Context. */
-    private final HadoopTaskContext ctx;
-
-    /**
-     * Creates new instance.
-     *
-     * @param ctx Context.
-     */
-    public HadoopV1Reporter(HadoopTaskContext ctx) {
-        this.ctx = ctx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setStatus(String status) {
-        // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters.Counter getCounter(Enum<?> name) {
-        return getCounter(name.getDeclaringClass().getName(), name.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters.Counter getCounter(String grp, String name) {
-        return new HadoopV1Counter(ctx.counter(grp, name, 
HadoopLongCounter.class));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrCounter(Enum<?> key, long amount) {
-        getCounter(key).increment(amount);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrCounter(String grp, String cntr, long amount) {
-        getCounter(grp, cntr).increment(amount);
-    }
-
-    /** {@inheritDoc} */
-    @Override public InputSplit getInputSplit() throws 
UnsupportedOperationException {
-        throw new UnsupportedOperationException("reporter has no input"); // 
TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getProgress() {
-        return 0.5f; // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public void progress() {
-        // TODO
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
deleted file mode 100644
index d2f6823..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-
-/**
- * Hadoop setup task implementation for v1 API.
- */
-public class HadoopV1SetupTask extends HadoopV1Task {
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     */
-    public HadoopV1SetupTask(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
-
-        try {
-            ctx.jobConf().getOutputFormat().checkOutputSpecs(null, 
ctx.jobConf());
-
-            OutputCommitter committer = ctx.jobConf().getOutputCommitter();
-
-            if (committer != null)
-                committer.setupJob(ctx.jobContext());
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
deleted file mode 100644
index 203def4..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop API v1 splitter.
- */
-public class HadoopV1Splitter {
-    /** */
-    private static final String[] EMPTY_HOSTS = {};
-
-    /**
-     * @param jobConf Job configuration.
-     * @return Collection of mapped splits.
-     * @throws IgniteCheckedException If mapping failed.
-     */
-    public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) 
throws IgniteCheckedException {
-        try {
-            InputFormat<?, ?> format = jobConf.getInputFormat();
-
-            assert format != null;
-
-            InputSplit[] splits = format.getSplits(jobConf, 0);
-
-            Collection<HadoopInputSplit> res = new ArrayList<>(splits.length);
-
-            for (int i = 0; i < splits.length; i++) {
-                InputSplit nativeSplit = splits[i];
-
-                if (nativeSplit instanceof FileSplit) {
-                    FileSplit s = (FileSplit)nativeSplit;
-
-                    res.add(new HadoopFileBlock(s.getLocations(), 
s.getPath().toUri(), s.getStart(), s.getLength()));
-                }
-                else
-                    res.add(HadoopUtils.wrapSplit(i, nativeSplit, 
nativeSplit.getLocations()));
-            }
-
-            return res;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
-     * @param clsName Input split class name.
-     * @param in Input stream.
-     * @param hosts Optional hosts.
-     * @return File block or {@code null} if it is not a {@link FileSplit} 
instance.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable public static HadoopFileBlock readFileBlock(String clsName, 
FSDataInputStream in,
-        @Nullable String[] hosts) throws IgniteCheckedException {
-        if (!FileSplit.class.getName().equals(clsName))
-            return null;
-
-        FileSplit split = U.newInstance(FileSplit.class);
-
-        try {
-            split.readFields(in);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        if (hosts == null)
-            hosts = EMPTY_HOSTS;
-
-        return new HadoopFileBlock(hosts, split.getPath().toUri(), 
split.getStart(), split.getLength());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
deleted file mode 100644
index a89323c..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.ignite.internal.processors.hadoop.HadoopTask;
-import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Extended Hadoop v1 task.
- */
-public abstract class HadoopV1Task extends HadoopTask {
-    /** Indicates that this task is to be cancelled. */
-    private volatile boolean cancelled;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     */
-    protected HadoopV1Task(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /**
-     * Gets file name for that task result.
-     *
-     * @return File name.
-     */
-    public String fileName() {
-        NumberFormat numFormat = NumberFormat.getInstance();
-
-        numFormat.setMinimumIntegerDigits(5);
-        numFormat.setGroupingUsed(false);
-
-        return "part-" + numFormat.format(info().taskNumber());
-    }
-
-    /**
-     *
-     * @param jobConf Job configuration.
-     * @param taskCtx Task context.
-     * @param directWrite Direct write flag.
-     * @param fileName File name.
-     * @param attempt Attempt of task.
-     * @return Collector.
-     * @throws IOException In case of IO exception.
-     */
-    protected HadoopV1OutputCollector collector(JobConf jobConf, 
HadoopV2TaskContext taskCtx,
-        boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) 
throws IOException {
-        HadoopV1OutputCollector collector = new 
HadoopV1OutputCollector(jobConf, taskCtx, directWrite,
-            fileName, attempt) {
-            /** {@inheritDoc} */
-            @Override public void collect(Object key, Object val) throws 
IOException {
-                if (cancelled)
-                    throw new HadoopTaskCancelledException("Task cancelled.");
-
-                super.collect(key, val);
-            }
-        };
-
-        collector.setup();
-
-        return collector;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        cancelled = true;
-    }
-
-    /** Returns true if task is cancelled. */
-    public boolean isCancelled() {
-        return cancelled;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
deleted file mode 100644
index 9632525..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.util.Collection;
-import java.util.LinkedList;
-
-/**
- * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class.
- */
-@SuppressWarnings("UnusedDeclaration")
-public class HadoopDaemon extends Thread {
-    /** Lock object used for synchronization. */
-    private static final Object lock = new Object();
-
-    /** Collection to hold the threads to be stopped. */
-    private static Collection<HadoopDaemon> daemons = new LinkedList<>();
-
-    {
-        setDaemon(true); // always a daemon
-    }
-
-    /** Runnable of this thread, may be this. */
-    final Runnable runnable;
-
-    /**
-     * Construct a daemon thread.
-     */
-    public HadoopDaemon() {
-        super();
-
-        runnable = this;
-
-        enqueueIfNeeded();
-    }
-
-    /**
-     * Construct a daemon thread.
-     */
-    public HadoopDaemon(Runnable runnable) {
-        super(runnable);
-
-        this.runnable = runnable;
-
-        this.setName(runnable.toString());
-
-        enqueueIfNeeded();
-    }
-
-    /**
-     * Construct a daemon thread to be part of a specified thread group.
-     */
-    public HadoopDaemon(ThreadGroup grp, Runnable runnable) {
-        super(grp, runnable);
-
-        this.runnable = runnable;
-
-        this.setName(runnable.toString());
-
-        enqueueIfNeeded();
-    }
-
-    /**
-     * Getter for the runnable. May return this.
-     *
-     * @return the runnable
-     */
-    public Runnable getRunnable() {
-        return runnable;
-    }
-
-    /**
-     * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable.
-     *
-     * @param r the runnable.
-     * @return true if it is.
-     */
-    private static boolean isPeerCacheRunnable(Runnable r) {
-        String name = r.getClass().getName();
-
-        return name.startsWith("org.apache.hadoop.hdfs.PeerCache");
-    }
-
-    /**
-     * Enqueue this thread if it should be stopped upon the task end.
-     */
-    private void enqueueIfNeeded() {
-        synchronized (lock) {
-            if (daemons == null)
-                throw new RuntimeException("Failed to create HadoopDaemon (its 
registry is already cleared): " +
-                    "[classLoader=" + getClass().getClassLoader() + ']');
-
-            if (runnable.getClass().getClassLoader() == 
getClass().getClassLoader() && isPeerCacheRunnable(runnable))
-                daemons.add(this);
-        }
-    }
-
-    /**
-     * Stops all the registered threads.
-     */
-    public static void dequeueAndStopAll() {
-        synchronized (lock) {
-            if (daemons != null) {
-                for (HadoopDaemon daemon : daemons)
-                    daemon.interrupt();
-
-                daemons = null;
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
deleted file mode 100644
index c7e8a0a..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-
-/**
- * Split serialized in external file.
- */
-public class HadoopExternalSplit extends HadoopInputSplit {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long off;
-
-    /**
-     * For {@link Externalizable}.
-     */
-    public HadoopExternalSplit() {
-        // No-op.
-    }
-
-    /**
-     * @param hosts Hosts.
-     * @param off Offset of this split in external file.
-     */
-    public HadoopExternalSplit(String[] hosts, long off) {
-        assert off >= 0 : off;
-        assert hosts != null;
-
-        this.hosts = hosts;
-        this.off = off;
-    }
-
-    /**
-     * @return Offset of this input split in external file.
-     */
-    public long offset() {
-        return off;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(off);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        off = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        HadoopExternalSplit that = (HadoopExternalSplit) o;
-
-        return off == that.off;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return (int)(off ^ (off >>> 32));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
deleted file mode 100644
index 844e7f8..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * The wrapper around external serializer.
- */
-public class HadoopSerializationWrapper<T> implements HadoopSerialization {
-    /** External serializer - writer. */
-    private final Serializer<T> serializer;
-
-    /** External serializer - reader. */
-    private final Deserializer<T> deserializer;
-
-    /** Data output for current write operation. */
-    private OutputStream currOut;
-
-    /** Data input for current read operation. */
-    private InputStream currIn;
-
-    /** Wrapper around current output to provide OutputStream interface. */
-    private final OutputStream outStream = new OutputStream() {
-        /** {@inheritDoc} */
-        @Override public void write(int b) throws IOException {
-            currOut.write(b);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(byte[] b, int off, int len) throws 
IOException {
-            currOut.write(b, off, len);
-        }
-    };
-
-    /** Wrapper around current input to provide InputStream interface. */
-    private final InputStream inStream = new InputStream() {
-        /** {@inheritDoc} */
-        @Override public int read() throws IOException {
-            return currIn.read();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int read(byte[] b, int off, int len) throws 
IOException {
-            return currIn.read(b, off, len);
-        }
-    };
-
-    /**
-     * @param serialization External serializer to wrap.
-     * @param cls The class to serialize.
-     */
-    public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> 
cls) throws IgniteCheckedException {
-        assert cls != null;
-
-        serializer = serialization.getSerializer(cls);
-        deserializer = serialization.getDeserializer(cls);
-
-        try {
-            serializer.open(outStream);
-            deserializer.open(inStream);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(DataOutput out, Object obj) throws 
IgniteCheckedException {
-        assert out != null;
-        assert obj != null;
-
-        try {
-            currOut = (OutputStream)out;
-
-            serializer.serialize((T)obj);
-
-            currOut = null;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object read(DataInput in, @Nullable Object obj) throws 
IgniteCheckedException {
-        assert in != null;
-
-        try {
-            currIn = (InputStream)in;
-
-            T res = deserializer.deserialize((T) obj);
-
-            currIn = null;
-
-            return res;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        try {
-            serializer.close();
-            deserializer.close();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
deleted file mode 100644
index 8bd71e0..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Fake manager for shutdown hooks.
- */
-public class HadoopShutdownHookManager {
-    /** */
-    private static final HadoopShutdownHookManager MGR = new 
HadoopShutdownHookManager();
-
-    /**
-     * Return <code>ShutdownHookManager</code> singleton.
-     *
-     * @return <code>ShutdownHookManager</code> singleton.
-     */
-    public static HadoopShutdownHookManager get() {
-        return MGR;
-    }
-
-    /** */
-    private Set<Runnable> hooks = Collections.synchronizedSet(new 
HashSet<Runnable>());
-
-    /** */
-    private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
-
-    /**
-     * Singleton.
-     */
-    private HadoopShutdownHookManager() {
-        // No-op.
-    }
-
-    /**
-     * Adds a shutdownHook with a priority, the higher the priority
-     * the earlier will run. ShutdownHooks with same priority run
-     * in a non-deterministic order.
-     *
-     * @param shutdownHook shutdownHook <code>Runnable</code>
-     * @param priority priority of the shutdownHook.
-     */
-    public void addShutdownHook(Runnable shutdownHook, int priority) {
-        if (shutdownHook == null)
-            throw new IllegalArgumentException("shutdownHook cannot be NULL");
-
-        hooks.add(shutdownHook);
-    }
-
-    /**
-     * Removes a shutdownHook.
-     *
-     * @param shutdownHook shutdownHook to remove.
-     * @return TRUE if the shutdownHook was registered and removed,
-     * FALSE otherwise.
-     */
-    public boolean removeShutdownHook(Runnable shutdownHook) {
-        return hooks.remove(shutdownHook);
-    }
-
-    /**
-     * Indicates if a shutdownHook is registered or not.
-     *
-     * @param shutdownHook shutdownHook to check if registered.
-     * @return TRUE/FALSE depending if the shutdownHook is is registered.
-     */
-    public boolean hasShutdownHook(Runnable shutdownHook) {
-        return hooks.contains(shutdownHook);
-    }
-
-    /**
-     * Indicates if shutdown is in progress or not.
-     *
-     * @return TRUE if the shutdown is in progress, otherwise FALSE.
-     */
-    public boolean isShutdownInProgress() {
-        return shutdownInProgress.get();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
deleted file mode 100644
index df77adb..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * The wrapper for native hadoop input splits.
- *
- * Warning!! This class must not depend on any Hadoop classes directly or 
indirectly.
- */
-public class HadoopSplitWrapper extends HadoopInputSplit {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Native hadoop input split. */
-    private byte[] bytes;
-
-    /** */
-    private String clsName;
-
-    /** Internal ID */
-    private int id;
-
-    /**
-     * Creates new split wrapper.
-     */
-    public HadoopSplitWrapper() {
-        // No-op.
-    }
-
-    /**
-     * Creates new split wrapper.
-     *
-     * @param id Split ID.
-     * @param clsName Class name.
-     * @param bytes Serialized class.
-     * @param hosts Hosts where split is located.
-     */
-    public HadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] 
hosts) {
-        assert hosts != null;
-        assert clsName != null;
-        assert bytes != null;
-
-        this.hosts = hosts;
-        this.id = id;
-
-        this.clsName = clsName;
-        this.bytes = bytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(id);
-
-        out.writeUTF(clsName);
-        U.writeByteArray(out, bytes);
-    }
-
-    /**
-     * @return Class name.
-     */
-    public String className() {
-        return clsName;
-    }
-
-    /**
-     * @return Class bytes.
-     */
-    public byte[] bytes() {
-        return bytes;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        id = in.readInt();
-
-        clsName = in.readUTF();
-        bytes = U.readByteArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        HadoopSplitWrapper that = (HadoopSplitWrapper)o;
-
-        return id == that.id;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return id;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
deleted file mode 100644
index abb904c..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.IOException;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-
-/**
- * Hadoop cleanup task (commits or aborts job).
- */
-public class HadoopV2CleanupTask extends HadoopV2Task {
-    /** Abort flag. */
-    private final boolean abort;
-
-    /**
-     * @param taskInfo Task info.
-     * @param abort Abort flag.
-     */
-    public HadoopV2CleanupTask(HadoopTaskInfo taskInfo, boolean abort) {
-        super(taskInfo);
-
-        this.abort = abort;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
-        JobContextImpl jobCtx = taskCtx.jobContext();
-
-        try {
-            OutputFormat outputFormat = getOutputFormat(jobCtx);
-
-            OutputCommitter committer = 
outputFormat.getOutputCommitter(hadoopContext());
-
-            if (committer != null) {
-                if (abort)
-                    committer.abortJob(jobCtx, JobStatus.State.FAILED);
-                else
-                    committer.commitJob(jobCtx);
-            }
-        }
-        catch (ClassNotFoundException | IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
deleted file mode 100644
index 2ff2945..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.ReduceContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-
-/**
- * Hadoop context implementation for v2 API. It provides IO operations for 
hadoop tasks.
- */
-public class HadoopV2Context extends JobContextImpl implements MapContext, 
ReduceContext {
-    /** Input reader to overriding of HadoopTaskContext input. */
-    private RecordReader reader;
-
-    /** Output writer to overriding of HadoopTaskContext output. */
-    private RecordWriter writer;
-
-    /** Output is provided by executor environment. */
-    private final HadoopTaskOutput output;
-
-    /** Input is provided by executor environment. */
-    private final HadoopTaskInput input;
-
-    /** Unique identifier for a task attempt. */
-    private final TaskAttemptID taskAttemptID;
-
-    /** Indicates that this task is to be cancelled. */
-    private volatile boolean cancelled;
-
-    /** Input split. */
-    private InputSplit inputSplit;
-
-    /** */
-    private final HadoopTaskContext ctx;
-
-    /** */
-    private String status;
-
-    /**
-     * @param ctx Context for IO operations.
-     */
-    public HadoopV2Context(HadoopV2TaskContext ctx) {
-        super(ctx.jobConf(), ctx.jobContext().getJobID());
-
-        taskAttemptID = ctx.attemptId();
-
-        conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString());
-        conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString());
-
-        output = ctx.output();
-        input = ctx.input();
-
-        this.ctx = ctx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public InputSplit getInputSplit() {
-        if (inputSplit == null) {
-            HadoopInputSplit split = ctx.taskInfo().inputSplit();
-
-            if (split == null)
-                return null;
-
-            if (split instanceof HadoopFileBlock) {
-                HadoopFileBlock fileBlock = (HadoopFileBlock)split;
-
-                inputSplit = new FileSplit(new Path(fileBlock.file()), 
fileBlock.start(), fileBlock.length(), null);
-            }
-            else
-            {
-                try {
-                    inputSplit = (InputSplit) 
((HadoopV2TaskContext)ctx).getNativeSplit(split);
-                } catch (IgniteCheckedException e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-        }
-
-        return inputSplit;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean nextKeyValue() throws IOException, 
InterruptedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        return reader.nextKeyValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object getCurrentKey() throws IOException, 
InterruptedException {
-        if (reader != null)
-            return reader.getCurrentKey();
-
-        return input.key();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object getCurrentValue() throws IOException, 
InterruptedException {
-        return reader.getCurrentValue();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void write(Object key, Object val) throws IOException, 
InterruptedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        if (writer != null)
-            writer.write(key, val);
-        else {
-            try {
-                output.write(key, val);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputCommitter getOutputCommitter() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskAttemptID getTaskAttemptID() {
-        return taskAttemptID;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setStatus(String msg) {
-        status = msg;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getStatus() {
-        return status;
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getProgress() {
-        return 0.5f; // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter getCounter(Enum<?> cntrName) {
-        return getCounter(cntrName.getDeclaringClass().getName(), 
cntrName.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter getCounter(String grpName, String cntrName) {
-        return new HadoopV2Counter(ctx.counter(grpName, cntrName, 
HadoopLongCounter.class));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void progress() {
-        // No-op.
-    }
-
-    /**
-     * Overrides default input data reader.
-     *
-     * @param reader New reader.
-     */
-    public void reader(RecordReader reader) {
-        this.reader = reader;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean nextKey() throws IOException, 
InterruptedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        return input.next();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterable getValues() throws IOException, 
InterruptedException {
-        return new Iterable() {
-            @Override public Iterator iterator() {
-                return input.values();
-            }
-        };
-    }
-
-    /**
-     * @return Overridden output data writer.
-     */
-    public RecordWriter writer() {
-        return writer;
-    }
-
-    /**
-     * Overrides default output data writer.
-     *
-     * @param writer New writer.
-     */
-    public void writer(RecordWriter writer) {
-        this.writer = writer;
-    }
-
-    /**
-     * Cancels the task by stop the IO.
-     */
-    public void cancel() {
-        cancelled = true;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
deleted file mode 100644
index cad9e64..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-
-/**
- * Adapter from own counter implementation into Hadoop API Counter od version 
2.0.
- */
-public class HadoopV2Counter implements Counter {
-    /** Delegate. */
-    private final HadoopLongCounter cntr;
-
-    /**
-     * Creates new instance with given delegate.
-     *
-     * @param cntr Internal counter.
-     */
-    public HadoopV2Counter(HadoopLongCounter cntr) {
-        assert cntr != null : "counter must be non-null";
-
-        this.cntr = cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDisplayName(String displayName) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getName() {
-        return cntr.name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getDisplayName() {
-        return getName();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getValue() {
-        return cntr.value();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setValue(long val) {
-        cntr.value(val);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void increment(long incr) {
-        cntr.increment(incr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter getUnderlyingCounter() {
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(DataOutput out) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFields(DataInput in) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
deleted file mode 100644
index 595474c..0000000
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Splitter;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
-
-/**
- * Hadoop job implementation for v2 API.
- */
-public class HadoopV2Job implements HadoopJob {
-    /** */
-    private final JobConf jobConf;
-
-    /** */
-    private final JobContextImpl jobCtx;
-
-    /** Hadoop job ID. */
-    private final HadoopJobId jobId;
-
-    /** Job info. */
-    protected final HadoopJobInfo jobInfo;
-
-    /** Native library names. */
-    private final String[] libNames;
-
-    /** */
-    private final JobID hadoopJobID;
-
-    /** */
-    private final HadoopV2JobResourceManager rsrcMgr;
-
-    /** */
-    private final ConcurrentMap<T2<HadoopTaskType, Integer>, 
GridFutureAdapter<HadoopTaskContext>> ctxs =
-        new ConcurrentHashMap8<>();
-
-    /** Pooling task context class and thus class loading environment. */
-    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = 
new ConcurrentLinkedQueue<>();
-
-    /** All created contexts. */
-    private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = 
new ConcurrentLinkedDeque<>();
-
-    /** File system cache map. */
-    private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = 
createHadoopLazyConcurrentMap();
-
-    /** Local node ID */
-    private volatile UUID locNodeId;
-
-    /** Serialized JobConf. */
-    private volatile byte[] jobConfData;
-
-    /**
-     * Constructor.
-     *
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     * @param log Logger.
-     * @param libNames Optional additional native library names.
-     */
-    public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, 
IgniteLogger log,
-        @Nullable String[] libNames) {
-        assert jobId != null;
-        assert jobInfo != null;
-
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.libNames = libNames;
-
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
-        try {
-            hadoopJobID = new JobID(jobId.globalId().toString(), 
jobId.localId());
-
-            jobConf = new JobConf();
-
-            HadoopFileSystemsUtils.setupFileSystems(jobConf);
-
-            for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
-                jobConf.set(e.getKey(), e.getValue());
-
-            jobCtx = new JobContextImpl(jobConf, hadoopJobID);
-
-            rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this);
-        }
-        finally {
-            HadoopUtils.setContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobId id() {
-        return jobId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobInfo info() {
-        return jobInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<HadoopInputSplit> input() throws 
IgniteCheckedException {
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(jobConf.getClassLoader());
-
-        try {
-            String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
-
-            if (jobDirPath == null) { // Probably job was submitted not by 
hadoop client.
-                // Assume that we have needed classes and try to generate 
input splits ourself.
-                if (jobConf.getUseNewMapper())
-                    return HadoopV2Splitter.splitJob(jobCtx);
-                else
-                    return HadoopV1Splitter.splitJob(jobConf);
-            }
-
-            Path jobDir = new Path(jobDirPath);
-
-            try {
-                FileSystem fs = fileSystem(jobDir.toUri(), jobConf);
-
-                JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
-                    jobDir);
-
-                if (F.isEmpty(metaInfos))
-                    throw new IgniteCheckedException("No input splits found.");
-
-                Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir);
-
-                try (FSDataInputStream in = fs.open(splitsFile)) {
-                    Collection<HadoopInputSplit> res = new 
ArrayList<>(metaInfos.length);
-
-                    for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) {
-                        long off = metaInfo.getStartOffset();
-
-                        String[] hosts = metaInfo.getLocations();
-
-                        in.seek(off);
-
-                        String clsName = Text.readString(in);
-
-                        HadoopFileBlock block = 
HadoopV1Splitter.readFileBlock(clsName, in, hosts);
-
-                        if (block == null)
-                            block = HadoopV2Splitter.readFileBlock(clsName, 
in, hosts);
-
-                        res.add(block != null ? block : new 
HadoopExternalSplit(hosts, off));
-                    }
-
-                    return res;
-                }
-            }
-            catch (Throwable e) {
-                if (e instanceof Error)
-                    throw (Error)e;
-                else
-                    throw transformException(e);
-            }
-        }
-        finally {
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked", "MismatchedQueryAndUpdateOfCollection" })
-    @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) 
throws IgniteCheckedException {
-        T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(),  
info.taskNumber());
-
-        GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId);
-
-        if (fut != null)
-            return fut.get();
-
-        GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, 
fut = new GridFutureAdapter<>());
-
-        if (old != null)
-            return old.get();
-
-        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
-
-        try {
-            if (cls == null) {
-                // If there is no pooled class, then load new one.
-                // Note that the classloader identified by the task it was 
initially created for,
-                // but later it may be reused for other tasks.
-                HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
-                    HadoopClassLoader.nameForTask(info, false), libNames);
-
-                cls = (Class<? extends 
HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
-
-                fullCtxClsQueue.add(cls);
-            }
-
-            Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, 
HadoopJob.class,
-                HadoopJobId.class, UUID.class, DataInput.class);
-
-            if (jobConfData == null)
-                synchronized(jobConf) {
-                    if (jobConfData == null) {
-                        ByteArrayOutputStream buf = new 
ByteArrayOutputStream();
-
-                        jobConf.write(new DataOutputStream(buf));
-
-                        jobConfData = buf.toByteArray();
-                    }
-                }
-
-            HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, 
this, jobId, locNodeId,
-                new DataInputStream(new ByteArrayInputStream(jobConfData)));
-
-            fut.onDone(res);
-
-            return res;
-        }
-        catch (Throwable e) {
-            IgniteCheckedException te = transformException(e);
-
-            fut.onDone(te);
-
-            if (e instanceof Error)
-                throw (Error)e;
-
-            throw te;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(boolean external, UUID locNodeId) throws 
IgniteCheckedException {
-        assert locNodeId != null;
-
-        this.locNodeId = locNodeId;
-
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
-        try {
-            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, 
jobId));
-        }
-        finally {
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ThrowFromFinallyBlock")
-    @Override public void dispose(boolean external) throws 
IgniteCheckedException {
-        try {
-            if (rsrcMgr != null && !external) {
-                File jobLocDir = jobLocalDir(locNodeId, jobId);
-
-                if (jobLocDir.exists())
-                    U.delete(jobLocDir);
-            }
-        }
-        finally {
-            taskCtxClsPool.clear();
-
-            Throwable err = null;
-
-            // Stop the daemon threads that have been created
-            // with the task class loaders:
-            while (true) {
-                Class<? extends HadoopTaskContext> cls = 
fullCtxClsQueue.poll();
-
-                if (cls == null)
-                    break;
-
-                try {
-                    final ClassLoader ldr = cls.getClassLoader();
-
-                    try {
-                        // Stop Hadoop daemons for this *task*:
-                        stopHadoopFsDaemons(ldr);
-                    }
-                    catch (Exception e) {
-                        if (err == null)
-                            err = e;
-                    }
-
-                    // Also close all the FileSystems cached in
-                    // HadoopLazyConcurrentMap for this *task* class loader:
-                    closeCachedTaskFileSystems(ldr);
-                }
-                catch (Throwable e) {
-                    if (err == null)
-                        err = e;
-
-                    if (e instanceof Error)
-                        throw (Error)e;
-                }
-            }
-
-            assert fullCtxClsQueue.isEmpty();
-
-            try {
-                // Close all cached file systems for this *Job*:
-                fsMap.close();
-            }
-            catch (Exception e) {
-                if (err == null)
-                    err = e;
-            }
-
-            if (err != null)
-                throw U.cast(err);
-        }
-    }
-
-    /**
-     * Stops Hadoop Fs daemon threads.
-     * @param ldr The task ClassLoader to stop the daemons for.
-     * @throws Exception On error.
-     */
-    private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception {
-        Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.CLS_DAEMON);
-
-        Method m = daemonCls.getMethod("dequeueAndStopAll");
-
-        m.invoke(null);
-    }
-
-    /**
-     * Closes all the file systems user by task
-     * @param ldr The task class loader.
-     * @throws Exception On error.
-     */
-    private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception {
-        Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName());
-
-        Method m = clazz.getMethod("close");
-
-        m.invoke(null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
-        rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
-        HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), 
info.taskNumber())).get();
-
-        taskCtxClsPool.add(ctx.getClass());
-
-        File locDir = taskLocalDir(locNodeId, info);
-
-        if (locDir.exists())
-            U.delete(locDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cleanupStagingDirectory() {
-        rsrcMgr.cleanupStagingDirectory();
-    }
-
-    /**
-     * Getter for job configuration.
-     * @return The job configuration.
-     */
-    public JobConf jobConf() {
-        return jobConf;
-    }
-
-    /**
-     * Gets file system for this job.
-     * @param uri The uri.
-     * @param cfg The configuration.
-     * @return The file system.
-     * @throws IOException On error.
-     */
-    public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws 
IOException {
-        return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
-    }
-}
\ No newline at end of file

Reply via email to