http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
new file mode 100644
index 0000000..bd8ed2d
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -0,0 +1,1076 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
+import org.apache.ignite.internal.processors.igfs.IgfsPaths;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
+import static 
org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
+import static 
org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
+
+/**
+ * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
+ * {@code IGFS} as Hadoop file system, you should configure this class
+ * in Hadoop's {@code core-site.xml} as follows:
+ * <pre name="code" class="xml">
+ *  &lt;property&gt;
+ *      &lt;name&gt;fs.default.name&lt;/name&gt;
+ *      &lt;value&gt;igfs://ipc&lt;/value&gt;
+ *  &lt;/property&gt;
+ *
+ *  &lt;property&gt;
+ *      &lt;name&gt;fs.igfs.impl&lt;/name&gt;
+ *      
&lt;value&gt;org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem&lt;/value&gt;
+ *  &lt;/property&gt;
+ * </pre>
+ * You should also add Ignite JAR and all libraries to Hadoop classpath. To
+ * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
+ * distribution:
+ * <pre name="code" class="bash">
+ * export IGNITE_HOME=/path/to/Ignite/distribution
+ * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
+ *
+ * for f in $IGNITE_HOME/libs/*.jar; do
+ *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
+ * done
+ * </pre>
+ * <h1 class="header">Data vs Clients Nodes</h1>
+ * Hadoop needs to use its FileSystem remotely from client nodes as well as 
directly on
+ * data nodes. Client nodes are responsible for basic file system operations 
as well as
+ * accessing data nodes remotely. Usually, client nodes are started together
+ * with {@code job-submitter} or {@code job-scheduler} processes, while data 
nodes are usually
+ * started together with Hadoop {@code task-tracker} processes.
+ * <p>
+ * For sample client and data node configuration refer to {@code 
config/hadoop/default-config-client.xml}
+ * and {@code config/hadoop/default-config.xml} configuration files in Ignite 
installation.
+ */
+public class IgniteHadoopFileSystem extends AbstractFileSystem implements 
Closeable {
+    /** Logger. */
+    private static final Log LOG = 
LogFactory.getLog(IgniteHadoopFileSystem.class);
+
+    /** Ensures that close routine is invoked at most once. */
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Grid remote client. */
+    private HadoopIgfsWrapper rmtClient;
+
+    /** The name of the user this File System created on behalf of. */
+    private final String user;
+
+    /** Working directory. */
+    private IgfsPath workingDir;
+
+    /** URI. */
+    private final URI uri;
+
+    /** Authority. */
+    private String uriAuthority;
+
+    /** Client logger. */
+    private IgfsLogger clientLog;
+
+    /** Server block size. */
+    private long grpBlockSize;
+
+    /** Default replication factor. */
+    private short dfltReplication;
+
+    /** Secondary URI string. */
+    private URI secondaryUri;
+
+    /** Mode resolver. */
+    private IgfsModeResolver modeRslvr;
+
+    /** The secondary file system factory. */
+    private HadoopFileSystemFactory factory;
+
+    /** Whether custom sequential reads before prefetch value is provided. */
+    private boolean seqReadsBeforePrefetchOverride;
+
+    /** Custom-provided sequential reads before prefetch. */
+    private int seqReadsBeforePrefetch;
+
+    /** Flag that controls whether file writes should be colocated on data 
node. */
+    private boolean colocateFileWrites;
+
+    /** Prefer local writes. */
+    private boolean preferLocFileWrites;
+
+    /**
+     * @param name URI for file system.
+     * @param cfg Configuration.
+     * @throws URISyntaxException if name has invalid syntax.
+     * @throws IOException If initialization failed.
+     */
+    public IgniteHadoopFileSystem(URI name, Configuration cfg) throws 
URISyntaxException, IOException {
+        super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1);
+
+        uri = name;
+
+        user = getFsHadoopUser();
+
+        try {
+            initialize(name, cfg);
+        }
+        catch (IOException e) {
+            // Close client if exception occurred.
+            if (rmtClient != null)
+                rmtClient.close(false);
+
+            throw e;
+        }
+
+        workingDir = new IgfsPath("/user/" + user);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkPath(Path path) {
+        URI uri = path.toUri();
+
+        if (uri.isAbsolute()) {
+            if (!F.eq(uri.getScheme(), IGFS_SCHEME))
+                throw new InvalidPathException("Wrong path scheme [expected=" 
+ IGFS_SCHEME + ", actual=" +
+                    uri.getAuthority() + ']');
+
+            if (!F.eq(uri.getAuthority(), uriAuthority))
+                throw new InvalidPathException("Wrong path authority 
[expected=" + uriAuthority + ", actual=" +
+                    uri.getAuthority() + ']');
+        }
+    }
+
+    /**
+     * Public setter that can be used by direct users of FS or Visor.
+     *
+     * @param colocateFileWrites Whether all ongoing file writes should be 
colocated.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void colocateFileWrites(boolean colocateFileWrites) {
+        this.colocateFileWrites = colocateFileWrites;
+    }
+
+    /**
+     * Enter busy state.
+     *
+     * @throws IOException If file system is stopped.
+     */
+    private void enterBusy() throws IOException {
+        if (closeGuard.get())
+            throw new IOException("File system is stopped.");
+    }
+
+    /**
+     * Leave busy state.
+     */
+    private void leaveBusy() {
+        // No-op.
+    }
+
+    /**
+     * @param name URI passed to constructor.
+     * @param cfg Configuration passed to constructor.
+     * @throws IOException If initialization failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void initialize(URI name, Configuration cfg) throws IOException {
+        enterBusy();
+
+        try {
+            if (rmtClient != null)
+                throw new IOException("File system is already initialized: " + 
rmtClient);
+
+            A.notNull(name, "name");
+            A.notNull(cfg, "cfg");
+
+            if (!IGFS_SCHEME.equals(name.getScheme()))
+                throw new IOException("Illegal file system URI [expected=" + 
IGFS_SCHEME +
+                    "://[name]/[optional_path], actual=" + name + ']');
+
+            uriAuthority = name.getAuthority();
+
+            // Override sequential reads before prefetch if needed.
+            seqReadsBeforePrefetch = parameter(cfg, 
PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
+
+            if (seqReadsBeforePrefetch > 0)
+                seqReadsBeforePrefetchOverride = true;
+
+            // In Ignite replication factor is controlled by data cache 
affinity.
+            // We use replication factor to force the whole file to be stored 
on local node.
+            dfltReplication = (short)cfg.getInt("dfs.replication", 3);
+
+            // Get file colocation control flag.
+            colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, 
uriAuthority, false);
+            preferLocFileWrites = 
cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
+
+            // Get log directory.
+            String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, 
uriAuthority, DFLT_IGFS_LOG_DIR);
+
+            File logDirFile = U.resolveIgnitePath(logDirCfg);
+
+            String logDir = logDirFile != null ? logDirFile.getAbsolutePath() 
: null;
+
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, 
user);
+
+            // Handshake.
+            IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
+
+            grpBlockSize = handshake.blockSize();
+
+            IgfsPaths paths = handshake.secondaryPaths();
+
+            Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, 
uriAuthority, false);
+
+            if (handshake.sampling() != null ? handshake.sampling() : 
logEnabled) {
+                // Initiate client logger.
+                if (logDir == null)
+                    throw new IOException("Failed to resolve log directory: " 
+ logDirCfg);
+
+                Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, 
uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
+
+                clientLog = IgfsLogger.logger(uriAuthority, 
handshake.igfsName(), logDir, batchSize);
+            }
+            else
+                clientLog = IgfsLogger.disabledLogger();
+
+            try {
+                modeRslvr = new IgfsModeResolver(paths.defaultMode(), 
paths.pathModes());
+            }
+            catch (IgniteCheckedException ice) {
+                throw new IOException(ice);
+            }
+
+            boolean initSecondary = paths.defaultMode() == PROXY;
+
+            if (!initSecondary && paths.pathModes() != null) {
+                for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
+                    IgfsMode mode = pathMode.getValue();
+
+                    if (mode == PROXY) {
+                        initSecondary = true;
+
+                        break;
+                    }
+                }
+            }
+
+            if (initSecondary) {
+                try {
+                    factory = (HadoopFileSystemFactory) 
paths.getPayload(getClass().getClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to get secondary file system 
factory.", e);
+                }
+
+                if (factory == null)
+                    throw new IOException("Failed to get secondary file system 
factory (did you set " +
+                        IgniteHadoopIgfsSecondaryFileSystem.class.getName() + 
" as \"secondaryFIleSystem\" in " +
+                        FileSystemConfiguration.class.getName() + "?)");
+
+                assert factory != null;
+
+                if (factory instanceof LifecycleAware)
+                    ((LifecycleAware) factory).start();
+
+                try {
+                    FileSystem secFs = factory.get(user);
+
+                    secondaryUri = secFs.getUri();
+
+                    A.ensure(secondaryUri != null, "Secondary file system uri 
should not be null.");
+                }
+                catch (IOException e) {
+                    throw new IOException("Failed to connect to the secondary 
file system: " + secondaryUri, e);
+                }
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        if (closeGuard.compareAndSet(false, true)) {
+            if (rmtClient == null)
+                return;
+
+            rmtClient.close(false);
+
+            if (clientLog.isLogEnabled())
+                clientLog.close();
+
+            if (factory instanceof LifecycleAware)
+                ((LifecycleAware) factory).stop();
+
+            // Reset initialized resources.
+            rmtClient = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public URI getUri() {
+        return uri;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getUriDefaultPort() {
+        return -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FsServerDefaults getServerDefaults() throws IOException {
+        return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, 
(int)grpBlockSize, dfltReplication, 64 * 1024,
+            false, 0, DataChecksum.Type.NULL);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean setReplication(Path f, short replication) throws 
IOException {
+        return mode(f) == PROXY && secondaryFileSystem().setReplication(f, 
replication);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTimes(Path f, long mtime, long atime) throws 
IOException {
+        if (mode(f) == PROXY)
+            secondaryFileSystem().setTimes(f, mtime, atime);
+        else {
+            if (mtime == -1 && atime == -1)
+                return;
+
+            rmtClient.setTimes(convert(f), atime, mtime);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public FsStatus getFsStatus() throws IOException {
+        IgfsStatus status = rmtClient.fsStatus();
+
+        return new FsStatus(status.spaceTotal(), status.spaceUsed(), 
status.spaceTotal() - status.spaceUsed());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPermission(Path p, FsPermission perm) throws 
IOException {
+        enterBusy();
+
+        try {
+            A.notNull(p, "p");
+
+            if (mode(p) == PROXY)
+                secondaryFileSystem().setPermission(toSecondary(p), perm);
+            else {
+                if (rmtClient.update(convert(p), permission(perm)) == null)
+                    throw new IOException("Failed to set file permission (file 
not found?)" +
+                        " [path=" + p + ", perm=" + perm + ']');
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setOwner(Path p, String usr, String grp) throws 
IOException {
+        A.notNull(p, "p");
+        A.notNull(usr, "username");
+        A.notNull(grp, "grpName");
+
+        enterBusy();
+
+        try {
+            if (mode(p) == PROXY)
+                secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
+            else if (rmtClient.update(convert(p), 
F.asMap(IgfsUtils.PROP_USER_NAME, usr,
+                IgfsUtils.PROP_GROUP_NAME, grp)) == null) {
+                throw new IOException("Failed to set file permission (file not 
found?)" +
+                    " [path=" + p + ", username=" + usr + ", grpName=" + grp + 
']');
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataInputStream open(Path f, int bufSize) throws 
IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = modeRslvr.resolveMode(path);
+
+            if (mode == PROXY) {
+                FSDataInputStream is = 
secondaryFileSystem().open(toSecondary(f), bufSize);
+
+                if (clientLog.isLogEnabled()) {
+                    // At this point we do not know file size, so we perform 
additional request to remote FS to get it.
+                    FileStatus status = 
secondaryFileSystem().getFileStatus(toSecondary(f));
+
+                    long size = status != null ? status.getLen() : -1;
+
+                    long logId = IgfsLogger.nextId();
+
+                    clientLog.logOpen(logId, path, PROXY, bufSize, size);
+
+                    return new FSDataInputStream(new 
HadoopIgfsProxyInputStream(is, clientLog, logId));
+                }
+                else
+                    return is;
+            }
+            else {
+                HadoopIgfsStreamDelegate stream = 
seqReadsBeforePrefetchOverride ?
+                    rmtClient.open(path, seqReadsBeforePrefetch) : 
rmtClient.open(path);
+
+                long logId = -1;
+
+                if (clientLog.isLogEnabled()) {
+                    logId = IgfsLogger.nextId();
+
+                    clientLog.logOpen(logId, path, mode, bufSize, 
stream.length());
+                }
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Opening input stream [thread=" + 
Thread.currentThread().getName() + ", path=" + path +
+                        ", bufSize=" + bufSize + ']');
+
+                HadoopIgfsInputStream igfsIn = new 
HadoopIgfsInputStream(stream, stream.length(),
+                    bufSize, LOG, clientLog, logId);
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Opened input stream [path=" + path + ", 
delegate=" + stream + ']');
+
+                return new FSDataInputStream(igfsIn);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public FSDataOutputStream createInternal(
+        Path f,
+        EnumSet<CreateFlag> flag,
+        FsPermission perm,
+        int bufSize,
+        short replication,
+        long blockSize,
+        Progressable progress,
+        Options.ChecksumOpt checksumOpt,
+        boolean createParent
+    ) throws IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+        boolean append = flag.contains(CreateFlag.APPEND);
+        boolean create = flag.contains(CreateFlag.CREATE);
+
+        OutputStream out = null;
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = modeRslvr.resolveMode(path);
+
+            if (LOG.isDebugEnabled())
+                LOG.debug("Opening output stream in create [thread=" + 
Thread.currentThread().getName() + "path=" +
+                    path + ", overwrite=" + overwrite + ", bufSize=" + bufSize 
+ ']');
+
+            if (mode == PROXY) {
+                FSDataOutputStream os = 
secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
+                    replication, blockSize, progress);
+
+                if (clientLog.isLogEnabled()) {
+                    long logId = IgfsLogger.nextId();
+
+                    if (append)
+                        clientLog.logAppend(logId, path, PROXY, bufSize); // 
Don't have stream ID.
+                    else
+                        clientLog.logCreate(logId, path, PROXY, overwrite, 
bufSize, replication, blockSize);
+
+                    return new FSDataOutputStream(new 
HadoopIgfsProxyOutputStream(os, clientLog, logId));
+                }
+                else
+                    return os;
+            }
+            else {
+                Map<String, String> permMap = 
F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm),
+                    IgfsUtils.PROP_PREFER_LOCAL_WRITES, 
Boolean.toString(preferLocFileWrites));
+
+                // Create stream and close it in the 'finally' section if any 
sequential operation failed.
+                HadoopIgfsStreamDelegate stream;
+
+                long logId = -1;
+
+                if (append) {
+                    stream = rmtClient.append(path, create, permMap);
+
+                    if (clientLog.isLogEnabled()) {
+                        logId = IgfsLogger.nextId();
+
+                        clientLog.logAppend(logId, path, mode, bufSize);
+                    }
+
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("Opened output stream in append [path=" + 
path + ", delegate=" + stream + ']');
+                }
+                else {
+                    stream = rmtClient.create(path, overwrite, 
colocateFileWrites, replication, blockSize,
+                        permMap);
+
+                    if (clientLog.isLogEnabled()) {
+                        logId = IgfsLogger.nextId();
+
+                        clientLog.logCreate(logId, path, mode, overwrite, 
bufSize, replication, blockSize);
+                    }
+
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("Opened output stream in create [path=" + 
path + ", delegate=" + stream + ']');
+                }
+
+                assert stream != null;
+
+                HadoopIgfsOutputStream igfsOut = new 
HadoopIgfsOutputStream(stream, LOG,
+                    clientLog, logId);
+
+                bufSize = Math.max(64 * 1024, bufSize);
+
+                out = new BufferedOutputStream(igfsOut, bufSize);
+
+                FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
+
+                // Mark stream created successfully.
+                out = null;
+
+                return res;
+            }
+        }
+        finally {
+            // Close if failed during stream creation.
+            if (out != null)
+                U.closeQuiet(out);
+
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean supportsSymlinks() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void renameInternal(Path src, Path dst) throws 
IOException {
+        A.notNull(src, "src");
+        A.notNull(dst, "dst");
+
+        enterBusy();
+
+        try {
+            IgfsPath srcPath = convert(src);
+            IgfsPath dstPath = convert(dst);
+
+            IgfsMode srcMode = modeRslvr.resolveMode(srcPath);
+
+            if (clientLog.isLogEnabled())
+                clientLog.logRename(srcPath, srcMode, dstPath);
+
+            if (srcMode == PROXY)
+                secondaryFileSystem().rename(toSecondary(src), 
toSecondary(dst));
+            else
+                rmtClient.rename(srcPath, dstPath);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(Path f, boolean recursive) throws 
IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+
+            IgfsMode mode = modeRslvr.resolveMode(path);
+
+            if (mode == PROXY) {
+                if (clientLog.isLogEnabled())
+                    clientLog.logDelete(path, PROXY, recursive);
+
+                return secondaryFileSystem().delete(toSecondary(f), recursive);
+            }
+
+            boolean res = rmtClient.delete(path, recursive);
+
+            if (clientLog.isLogEnabled())
+                clientLog.logDelete(path, mode, recursive);
+
+            return res;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setVerifyChecksum(boolean verifyChecksum) throws 
IOException {
+        // Checksum has effect for secondary FS only.
+        if (factory != null)
+            secondaryFileSystem().setVerifyChecksum(verifyChecksum);
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileChecksum getFileChecksum(Path f) throws IOException {
+        if (mode(f) == PROXY)
+            return secondaryFileSystem().getFileChecksum(f);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus[] listStatus(Path f) throws IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = modeRslvr.resolveMode(path);
+
+            if (mode == PROXY) {
+                FileStatus[] arr = 
secondaryFileSystem().listStatus(toSecondary(f));
+
+                if (arr == null)
+                    throw new FileNotFoundException("File " + f + " does not 
exist.");
+
+                for (int i = 0; i < arr.length; i++)
+                    arr[i] = toPrimary(arr[i]);
+
+                if (clientLog.isLogEnabled()) {
+                    String[] fileArr = new String[arr.length];
+
+                    for (int i = 0; i < arr.length; i++)
+                        fileArr[i] = arr[i].getPath().toString();
+
+                    clientLog.logListDirectory(path, PROXY, fileArr);
+                }
+
+                return arr;
+            }
+            else {
+                Collection<IgfsFile> list = rmtClient.listFiles(path);
+
+                if (list == null)
+                    throw new FileNotFoundException("File " + f + " does not 
exist.");
+
+                List<IgfsFile> files = new ArrayList<>(list);
+
+                FileStatus[] arr = new FileStatus[files.size()];
+
+                for (int i = 0; i < arr.length; i++)
+                    arr[i] = convert(files.get(i));
+
+                if (clientLog.isLogEnabled()) {
+                    String[] fileArr = new String[arr.length];
+
+                    for (int i = 0; i < arr.length; i++)
+                        fileArr[i] = arr[i].getPath().toString();
+
+                    clientLog.logListDirectory(path, mode, fileArr);
+                }
+
+                return arr;
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdir(Path f, FsPermission perm, boolean 
createParent) throws IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = modeRslvr.resolveMode(path);
+
+            if (mode == PROXY) {
+                if (clientLog.isLogEnabled())
+                    clientLog.logMakeDirectory(path, PROXY);
+
+                secondaryFileSystem().mkdirs(toSecondary(f), perm);
+            }
+            else {
+                rmtClient.mkdirs(path, permission(perm));
+
+                if (clientLog.isLogEnabled())
+                    clientLog.logMakeDirectory(path, mode);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileStatus(Path f) throws IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            if (mode(f) == PROXY)
+                return 
toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
+            else {
+                IgfsFile info = rmtClient.info(convert(f));
+
+                if (info == null)
+                    throw new FileNotFoundException("File not found: " + f);
+
+                return convert(info);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public BlockLocation[] getFileBlockLocations(Path path, long 
start, long len) throws IOException {
+        A.notNull(path, "path");
+
+        IgfsPath igfsPath = convert(path);
+
+        enterBusy();
+
+        try {
+            if (modeRslvr.resolveMode(igfsPath) == PROXY)
+                return secondaryFileSystem().getFileBlockLocations(path, 
start, len);
+            else {
+                long now = System.currentTimeMillis();
+
+                List<IgfsBlockLocation> affinity = new ArrayList<>(
+                    rmtClient.affinity(igfsPath, start, len));
+
+                BlockLocation[] arr = new BlockLocation[affinity.size()];
+
+                for (int i = 0; i < arr.length; i++)
+                    arr[i] = convert(affinity.get(i));
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Fetched file locations [path=" + path + ", 
fetchTime=" +
+                        (System.currentTimeMillis() - now) + ", locations=" + 
Arrays.asList(arr) + ']');
+
+                return arr;
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Resolve path mode.
+     *
+     * @param path HDFS path.
+     * @return Path mode.
+     */
+    public IgfsMode mode(Path path) {
+        return modeRslvr.resolveMode(convert(path));
+    }
+
+    /**
+     * Convert the given path to path acceptable by the primary file system.
+     *
+     * @param path Path.
+     * @return Primary file system path.
+     */
+    private Path toPrimary(Path path) {
+        return convertPath(path, getUri());
+    }
+
+    /**
+     * Convert the given path to path acceptable by the secondary file system.
+     *
+     * @param path Path.
+     * @return Secondary file system path.
+     */
+    private Path toSecondary(Path path) {
+        assert factory != null;
+        assert secondaryUri != null;
+
+        return convertPath(path, secondaryUri);
+    }
+
+    /**
+     * Convert path using the given new URI.
+     *
+     * @param path Old path.
+     * @param newUri New URI.
+     * @return New path.
+     */
+    private Path convertPath(Path path, URI newUri) {
+        assert newUri != null;
+
+        if (path != null) {
+            URI pathUri = path.toUri();
+
+            try {
+                return new Path(new URI(pathUri.getScheme() != null ? 
newUri.getScheme() : null,
+                    pathUri.getAuthority() != null ? newUri.getAuthority() : 
null, pathUri.getPath(), null, null));
+            }
+            catch (URISyntaxException e) {
+                throw new IgniteException("Failed to construct secondary file 
system path from the primary file " +
+                    "system path: " + path, e);
+            }
+        }
+        else
+            return null;
+    }
+
+    /**
+     * Convert a file status obtained from the secondary file system to a 
status of the primary file system.
+     *
+     * @param status Secondary file system status.
+     * @return Primary file system status.
+     */
+    private FileStatus toPrimary(FileStatus status) {
+        return status != null ? new FileStatus(status.getLen(), 
status.isDirectory(), status.getReplication(),
+            status.getBlockSize(), status.getModificationTime(), 
status.getAccessTime(), status.getPermission(),
+            status.getOwner(), status.getGroup(), toPrimary(status.getPath())) 
: null;
+    }
+
+    /**
+     * Convert IGFS path into Hadoop path.
+     *
+     * @param path IGFS path.
+     * @return Hadoop path.
+     */
+    private Path convert(IgfsPath path) {
+        return new Path(IGFS_SCHEME, uriAuthority, path.toString());
+    }
+
+    /**
+     * Convert Hadoop path into IGFS path.
+     *
+     * @param path Hadoop path.
+     * @return IGFS path.
+     */
+    @Nullable private IgfsPath convert(Path path) {
+        if (path == null)
+            return null;
+
+        return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
+            new IgfsPath(workingDir, path.toUri().getPath());
+    }
+
+    /**
+     * Convert IGFS affinity block location into Hadoop affinity block 
location.
+     *
+     * @param block IGFS affinity block location.
+     * @return Hadoop affinity block location.
+     */
+    private BlockLocation convert(IgfsBlockLocation block) {
+        Collection<String> names = block.names();
+        Collection<String> hosts = block.hosts();
+
+        return new BlockLocation(
+            names.toArray(new String[names.size()]) /* hostname:portNumber of 
data nodes */,
+            hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes 
*/,
+            block.start(), block.length()
+        ) {
+            @Override public String toString() {
+                try {
+                    return "BlockLocation [offset=" + getOffset() + ", 
length=" + getLength() +
+                        ", hosts=" + Arrays.asList(getHosts()) + ", names=" + 
Arrays.asList(getNames()) + ']';
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Convert IGFS file information into Hadoop file status.
+     *
+     * @param file IGFS file information.
+     * @return Hadoop file status.
+     */
+    private FileStatus convert(IgfsFile file) {
+        return new FileStatus(
+            file.length(),
+            file.isDirectory(),
+            dfltReplication,
+            file.groupBlockSize(),
+            file.modificationTime(),
+            file.accessTime(),
+            permission(file),
+            file.property(IgfsUtils.PROP_USER_NAME, user),
+            file.property(IgfsUtils.PROP_GROUP_NAME, "users"),
+            convert(file.path())) {
+            @Override public String toString() {
+                return "FileStatus [path=" + getPath() + ", isDir=" + 
isDirectory() + ", len=" + getLen() + "]";
+            }
+        };
+    }
+
+    /**
+     * Convert Hadoop permission into IGFS file attribute.
+     *
+     * @param perm Hadoop permission.
+     * @return IGFS attributes.
+     */
+    private Map<String, String> permission(FsPermission perm) {
+        if (perm == null)
+            perm = FsPermission.getDefault();
+
+        return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm));
+    }
+
+    /**
+     * @param perm Permission.
+     * @return String.
+     */
+    private static String toString(FsPermission perm) {
+        return String.format("%04o", perm.toShort());
+    }
+
+    /**
+     * Convert IGFS file attributes into Hadoop permission.
+     *
+     * @param file File info.
+     * @return Hadoop permission.
+     */
+    private FsPermission permission(IgfsFile file) {
+        String perm = file.property(IgfsUtils.PROP_PERMISSION, null);
+
+        if (perm == null)
+            return FsPermission.getDefault();
+
+        try {
+            return new FsPermission((short)Integer.parseInt(perm, 8));
+        }
+        catch (NumberFormatException ignore) {
+            return FsPermission.getDefault();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteHadoopFileSystem.class, this);
+    }
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
+
+    /**
+     * Gets cached or creates a {@link FileSystem}.
+     *
+     * @return The secondary file system.
+     */
+    private FileSystem secondaryFileSystem() throws IOException{
+        assert factory != null;
+
+        return factory.get(user);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
new file mode 100644
index 0000000..d8e70d1
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Ignite Hadoop 2.x <code>FileSystem</code> implementation.
+ */
+package org.apache.ignite.hadoop.fs.v2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
new file mode 100644
index 0000000..583af35
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientException;
+import org.apache.ignite.internal.client.GridClientFactory;
+import 
org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
+
+
+/**
+ * Ignite Hadoop client protocol provider.
+ */
+public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider 
{
+    /** Framework name used in configuration. */
+    public static final String FRAMEWORK_NAME = "ignite";
+
+    /** Clients. */
+    private static final ConcurrentHashMap<String, 
IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public ClientProtocol create(Configuration conf) throws 
IOException {
+        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+            String addr = conf.get(MRConfig.MASTER_ADDRESS);
+
+            if (F.isEmpty(addr))
+                throw new IOException("Failed to create client protocol 
because server address is not specified (is " +
+                    MRConfig.MASTER_ADDRESS + " property set?).");
+
+            if (F.eq(addr, "local"))
+                throw new IOException("Local execution mode is not supported, 
please point " +
+                    MRConfig.MASTER_ADDRESS + " to real Ignite node.");
+
+            return createProtocol(addr, conf);
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientProtocol create(InetSocketAddress addr, 
Configuration conf) throws IOException {
+        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME)))
+            return createProtocol(addr.getHostString() + ":" + addr.getPort(), 
conf);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(ClientProtocol cliProto) throws IOException {
+        // No-op.
+    }
+
+    /**
+     * Internal protocol creation routine.
+     *
+     * @param addr Address.
+     * @param conf Configuration.
+     * @return Client protocol.
+     * @throws IOException If failed.
+     */
+    private static ClientProtocol createProtocol(String addr, Configuration 
conf) throws IOException {
+        return new HadoopClientProtocol(conf, client(addr));
+    }
+
+    /**
+     * Create client.
+     *
+     * @param addr Endpoint address.
+     * @return Client.
+     * @throws IOException If failed.
+     */
+    private static GridClient client(String addr) throws IOException {
+        try {
+            IgniteInternalFuture<GridClient> fut = cliMap.get(addr);
+
+            if (fut == null) {
+                GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
+
+                IgniteInternalFuture<GridClient> oldFut = 
cliMap.putIfAbsent(addr, fut0);
+
+                if (oldFut != null)
+                    return oldFut.get();
+                else {
+                    GridClientConfiguration cliCfg = new 
GridClientConfiguration();
+
+                    cliCfg.setProtocol(TCP);
+                    cliCfg.setServers(Collections.singletonList(addr));
+                    cliCfg.setMarshaller(new GridClientJdkMarshaller());
+                    cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 
1 day.
+                    cliCfg.setDaemon(true);
+
+                    try {
+                        GridClient cli = GridClientFactory.start(cliCfg);
+
+                        fut0.onDone(cli);
+
+                        return cli;
+                    }
+                    catch (GridClientException e) {
+                        fut0.onDone(e);
+
+                        throw new IOException("Failed to establish connection 
with Ignite node: " + addr, e);
+                    }
+                }
+            }
+            else
+                return fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to establish connection with Ignite 
node: " + addr, e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
new file mode 100644
index 0000000..7635b9e
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Ignite Hadoop Accelerator map-reduce classes.
+ */
+package org.apache.ignite.hadoop.mapreduce;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
new file mode 100644
index 0000000..23eaa18
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+/**
+ * Hadoop attributes.
+ */
+public class HadoopAttributes implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Attribute name. */
+    public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + 
".hadoop";
+
+    /** Map-reduce planner class name. */
+    private String plannerCls;
+
+    /** External executor flag. */
+    private boolean extExec;
+
+    /** Maximum parallel tasks. */
+    private int maxParallelTasks;
+
+    /** Maximum task queue size. */
+    private int maxTaskQueueSize;
+
+    /** Library names. */
+    @GridToStringExclude
+    private String[] libNames;
+
+    /** Number of cores. */
+    private int cores;
+
+    /**
+     * Get attributes for node (if any).
+     *
+     * @param node Node.
+     * @return Attributes or {@code null} if Hadoop Accelerator is not enabled 
for node.
+     */
+    @Nullable public static HadoopAttributes forNode(ClusterNode node) {
+        return node.attribute(NAME);
+    }
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public HadoopAttributes() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Configuration.
+     */
+    public HadoopAttributes(HadoopConfiguration cfg) {
+        assert cfg != null;
+        assert cfg.getMapReducePlanner() != null;
+
+        plannerCls = cfg.getMapReducePlanner().getClass().getName();
+
+        // TODO: IGNITE-404: Get from configuration when fixed.
+        extExec = false;
+
+        maxParallelTasks = cfg.getMaxParallelTasks();
+        maxTaskQueueSize = cfg.getMaxTaskQueueSize();
+        libNames = cfg.getNativeLibraryNames();
+
+        // Cores count already passed in other attributes, we add it here for 
convenience.
+        cores = Runtime.getRuntime().availableProcessors();
+    }
+
+    /**
+     * @return Map reduce planner class name.
+     */
+    public String plannerClassName() {
+        return plannerCls;
+    }
+
+    /**
+     * @return External execution flag.
+     */
+    public boolean externalExecution() {
+        return extExec;
+    }
+
+    /**
+     * @return Maximum parallel tasks.
+     */
+    public int maxParallelTasks() {
+        return maxParallelTasks;
+    }
+
+    /**
+     * @return Maximum task queue size.
+     */
+    public int maxTaskQueueSize() {
+        return maxTaskQueueSize;
+    }
+
+
+    /**
+     * @return Native library names.
+     */
+    public String[] nativeLibraryNames() {
+        return libNames;
+    }
+
+    /**
+     * @return Number of cores on machine.
+     */
+    public int cores() {
+        return cores;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(plannerCls);
+        out.writeBoolean(extExec);
+        out.writeInt(maxParallelTasks);
+        out.writeInt(maxTaskQueueSize);
+        out.writeObject(libNames);
+        out.writeInt(cores);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        plannerCls = (String)in.readObject();
+        extExec = in.readBoolean();
+        maxParallelTasks = in.readInt();
+        maxTaskQueueSize = in.readInt();
+        libNames = (String[])in.readObject();
+        cores = in.readInt();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopAttributes.class, this, "libNames", 
Arrays.toString(libNames));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
new file mode 100644
index 0000000..aeda5c0
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+
+/**
+ * Abstract class for all hadoop components.
+ */
+public abstract class HadoopComponent {
+    /** Hadoop context. */
+    protected HadoopContext ctx;
+
+    /** Logger. */
+    protected IgniteLogger log;
+
+    /**
+     * @param ctx Hadoop context.
+     */
+    public void start(HadoopContext ctx) throws IgniteCheckedException {
+        this.ctx = ctx;
+
+        log = ctx.kernalContext().log(getClass());
+    }
+
+    /**
+     * Stops manager.
+     */
+    public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /**
+     * Callback invoked when all grid components are started.
+     */
+    public void onKernalStart() throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /**
+     * Callback invoked before all grid components are stopped.
+     */
+    public void onKernalStop(boolean cancel) {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
new file mode 100644
index 0000000..42a3d72
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+import 
org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Hadoop accelerator context.
+ */
+public class HadoopContext {
+    /** Kernal context. */
+    private GridKernalContext ctx;
+
+    /** Hadoop configuration. */
+    private HadoopConfiguration cfg;
+
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+
+    /** External task executor. */
+    private HadoopTaskExecutorAdapter taskExecutor;
+
+    /** */
+    private HadoopShuffle shuffle;
+
+    /** Managers list. */
+    private List<HadoopComponent> components = new ArrayList<>();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public HadoopContext(
+        GridKernalContext ctx,
+        HadoopConfiguration cfg,
+        HadoopJobTracker jobTracker,
+        HadoopTaskExecutorAdapter taskExecutor,
+        HadoopShuffle shuffle
+    ) {
+        this.ctx = ctx;
+        this.cfg = cfg;
+
+        this.jobTracker = add(jobTracker);
+        this.taskExecutor = add(taskExecutor);
+        this.shuffle = add(shuffle);
+    }
+
+    /**
+     * Gets list of managers.
+     *
+     * @return List of managers.
+     */
+    public List<HadoopComponent> components() {
+        return components;
+    }
+
+    /**
+     * Gets kernal context.
+     *
+     * @return Grid kernal context instance.
+     */
+    public GridKernalContext kernalContext() {
+        return ctx;
+    }
+
+    /**
+     * Gets Hadoop configuration.
+     *
+     * @return Hadoop configuration.
+     */
+    public HadoopConfiguration configuration() {
+        return cfg;
+    }
+
+    /**
+     * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
+     *
+     * @return Local node ID.
+     */
+    public UUID localNodeId() {
+        return ctx.localNodeId();
+    }
+
+    /**
+     * Gets local node order.
+     *
+     * @return Local node order.
+     */
+    public long localNodeOrder() {
+        assert ctx.discovery() != null;
+
+        return ctx.discovery().localNode().order();
+    }
+
+    /**
+     * @return Hadoop-enabled nodes.
+     */
+    public Collection<ClusterNode> nodes() {
+        return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, 
ctx.discovery().topologyVersionEx());
+    }
+
+    /**
+     * @return {@code True} if
+     */
+    public boolean jobUpdateLeader() {
+        long minOrder = Long.MAX_VALUE;
+        ClusterNode minOrderNode = null;
+
+        for (ClusterNode node : nodes()) {
+            if (node.order() < minOrder) {
+                minOrder = node.order();
+                minOrderNode = node;
+            }
+        }
+
+        assert minOrderNode != null;
+
+        return localNodeId().equals(minOrderNode.id());
+    }
+
+    /**
+     * @param meta Job metadata.
+     * @return {@code true} If local node is participating in job execution.
+     */
+    public boolean isParticipating(HadoopJobMetadata meta) {
+        UUID locNodeId = localNodeId();
+
+        if (locNodeId.equals(meta.submitNodeId()))
+            return true;
+
+        HadoopMapReducePlan plan = meta.mapReducePlan();
+
+        return plan.mapperNodeIds().contains(locNodeId) || 
plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
+    }
+
+    /**
+     * @return Jon tracker instance.
+     */
+    public HadoopJobTracker jobTracker() {
+        return jobTracker;
+    }
+
+    /**
+     * @return Task executor.
+     */
+    public HadoopTaskExecutorAdapter taskExecutor() {
+        return taskExecutor;
+    }
+
+    /**
+     * @return Shuffle.
+     */
+    public HadoopShuffle shuffle() {
+        return shuffle;
+    }
+
+    /**
+     * @return Map-reduce planner.
+     */
+    public HadoopMapReducePlanner planner() {
+        return cfg.getMapReducePlanner();
+    }
+
+    /**
+     * Adds component.
+     *
+     * @param c Component to add.
+     * @return Added manager.
+     */
+    private <C extends HadoopComponent> C add(C c) {
+        components.add(c);
+
+        return c;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
new file mode 100644
index 0000000..1382c1f
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop job info based on default Hadoop configuration.
+ */
+public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
+    /** */
+    private static final long serialVersionUID = 5489900236464999951L;
+
+    /** {@code true} If job has combiner. */
+    private boolean hasCombiner;
+
+    /** Number of reducers configured for job. */
+    private int numReduces;
+
+    /** Configuration. */
+    private Map<String,String> props = new HashMap<>();
+
+    /** Job name. */
+    private String jobName;
+
+    /** User name. */
+    private String user;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopDefaultJobInfo() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param jobName Job name.
+     * @param user User name.
+     * @param hasCombiner {@code true} If job has combiner.
+     * @param numReduces Number of reducers configured for job.
+     * @param props All other properties of the job.
+     */
+    public HadoopDefaultJobInfo(String jobName, String user, boolean 
hasCombiner, int numReduces,
+        Map<String, String> props) {
+        this.jobName = jobName;
+        this.user = user;
+        this.hasCombiner = hasCombiner;
+        this.numReduces = numReduces;
+        this.props = props;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String property(String name) {
+        return props.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, 
HadoopJobId jobId, IgniteLogger log,
+        @Nullable String[] libNames) throws IgniteCheckedException {
+        assert jobCls != null;
+
+        try {
+            Constructor<? extends HadoopJob> constructor = 
jobCls.getConstructor(HadoopJobId.class,
+                HadoopDefaultJobInfo.class, IgniteLogger.class, 
String[].class);
+
+            return constructor.newInstance(jobId, this, log, libNames);
+        }
+        catch (Throwable t) {
+            if (t instanceof Error)
+                throw (Error)t;
+            
+            throw new IgniteCheckedException(t);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasCombiner() {
+        return hasCombiner;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasReducer() {
+        return reducers() > 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int reducers() {
+        return numReduces;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String jobName() {
+        return jobName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, jobName);
+        U.writeString(out, user);
+
+        out.writeBoolean(hasCombiner);
+        out.writeInt(numReduces);
+
+        U.writeStringMap(out, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        jobName = U.readString(in);
+        user = U.readString(in);
+
+        hasCombiner = in.readBoolean();
+        numReduces = in.readInt();
+
+        props = U.readStringMap(in);
+    }
+
+    /**
+     * @return Properties of the job.
+     */
+    public Map<String, String> properties() {
+        return props;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
new file mode 100644
index 0000000..ed2657e
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop facade implementation.
+ */
+public class HadoopImpl implements Hadoop {
+    /** Hadoop processor. */
+    private final HadoopProcessor proc;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /**
+     * Constructor.
+     *
+     * @param proc Hadoop processor.
+     */
+    HadoopImpl(HadoopProcessor proc) {
+        this.proc = proc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration configuration() {
+        return proc.config();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId nextJobId() {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.nextJobId();
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get next job ID (grid 
is stopping).");
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, 
HadoopJobInfo jobInfo) {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.submit(jobId, jobInfo);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to submit job (grid is 
stopping).");
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) 
throws IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.status(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get job status (grid is 
stopping).");
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) 
throws IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.counters(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get job counters (grid 
is stopping).");
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteInternalFuture<?> 
finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.finishFuture(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to get job finish future 
(grid is stopping).");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean kill(HadoopJobId jobId) throws 
IgniteCheckedException {
+        if (busyLock.enterBusy()) {
+            try {
+                return proc.kill(jobId);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to kill job (grid is 
stopping).");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
new file mode 100644
index 0000000..4e03e17
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+
+/**
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+    /** Counters. */
+    private final HadoopMapReduceCounters cntrs;
+
+    /** Group name. */
+    private final String name;
+
+    /**
+     * Creates new instance.
+     *
+     * @param cntrs Client counters instance.
+     * @param name Group name.
+     */
+    HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+        this.cntrs = cntrs;
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addCounter(Counter counter) {
+        addCounter(counter.getName(), counter.getDisplayName(), 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter addCounter(String name, String displayName, long 
value) {
+        final Counter counter = cntrs.findCounter(this.name, name);
+
+        counter.setValue(value);
+
+        return counter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, String 
displayName) {
+        return cntrs.findCounter(name, counterName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, boolean create) {
+        return cntrs.findCounter(name, counterName, create);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName) {
+        return cntrs.findCounter(name, counterName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return cntrs.groupSize(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrAllCounters(CounterGroupBase<Counter> 
rightGroup) {
+        for (final Counter counter : rightGroup)
+            cntrs.findCounter(name, 
counter.getName()).increment(counter.getValue());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<Counter> iterator() {
+        return cntrs.iterateGroup(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
new file mode 100644
index 0000000..57a853f
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ * Hadoop counters adapter.
+ */
+public class HadoopMapReduceCounters extends Counters {
+    /** */
+    private final Map<T2<String,String>,HadoopLongCounter> cntrs = new 
HashMap<>();
+
+    /**
+     * Creates new instance based on given counters.
+     *
+     * @param cntrs Counters to adapt.
+     */
+    public 
HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters
 cntrs) {
+        for (HadoopCounter cntr : cntrs.all())
+            if (cntr instanceof HadoopLongCounter)
+                this.cntrs.put(new T2<>(cntr.group(), cntr.name()), 
(HadoopLongCounter) cntr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
+        return addGroup(grp.getName(), grp.getDisplayName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CounterGroup addGroup(String name, String displayName) {
+        return new HadoopMapReduceCounterGroup(this, name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String grpName, String cntrName) {
+        return findCounter(grpName, cntrName, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Counter findCounter(Enum<?> key) {
+        return findCounter(key.getDeclaringClass().getName(), key.name(), 
true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Counter findCounter(String scheme, 
FileSystemCounter key) {
+        return findCounter(String.format("FileSystem Counter (%s)", scheme), 
key.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Iterable<String> getGroupNames() {
+        Collection<String> res = new HashSet<>();
+
+        for (HadoopCounter counter : cntrs.values())
+            res.add(counter.group());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<CounterGroup> iterator() {
+        final Iterator<String> iter = getGroupNames().iterator();
+
+        return new Iterator<CounterGroup>() {
+            @Override public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override public CounterGroup next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                return new 
HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
+            }
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException("not implemented");
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized CounterGroup getGroup(String grpName) {
+        return new HadoopMapReduceCounterGroup(this, grpName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int countCounters() {
+        return cntrs.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataOutput out) throws 
IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFields(DataInput in) throws 
IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void 
incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
+        for (CounterGroup group : other) {
+            for (Counter counter : group) {
+                findCounter(group.getName(), 
counter.getName()).increment(counter.getValue());
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object genericRight) {
+        if (!(genericRight instanceof HadoopMapReduceCounters))
+            return false;
+
+        return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return cntrs.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWriteAllCounters(boolean snd) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean getWriteAllCounters() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Limits limits() {
+        return null;
+    }
+
+    /**
+     * Returns size of a group.
+     *
+     * @param grpName Name of the group.
+     * @return amount of counters in the given group.
+     */
+    public int groupSize(String grpName) {
+        int res = 0;
+
+        for (HadoopCounter counter : cntrs.values()) {
+            if (grpName.equals(counter.group()))
+                res++;
+        }
+
+        return res;
+    }
+
+    /**
+     * Returns counters iterator for specified group.
+     *
+     * @param grpName Name of the group to iterate.
+     * @return Counters iterator.
+     */
+    public Iterator<Counter> iterateGroup(String grpName) {
+        Collection<Counter> grpCounters = new ArrayList<>();
+
+        for (HadoopLongCounter counter : cntrs.values()) {
+            if (grpName.equals(counter.group()))
+                grpCounters.add(new HadoopV2Counter(counter));
+        }
+
+        return grpCounters.iterator();
+    }
+
+    /**
+     * Find a counter in the group.
+     *
+     * @param grpName The name of the counter group.
+     * @param cntrName The name of the counter.
+     * @param create Create the counter if not found if true.
+     * @return The counter that was found or added or {@code null} if create 
is false.
+     */
+    public Counter findCounter(String grpName, String cntrName, boolean 
create) {
+        T2<String, String> key = new T2<>(grpName, cntrName);
+
+        HadoopLongCounter internalCntr = cntrs.get(key);
+
+        if (internalCntr == null & create) {
+            internalCntr = new HadoopLongCounter(grpName,cntrName);
+
+            cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
+        }
+
+        return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
+    }
+}
\ No newline at end of file

Reply via email to