http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
new file mode 100644
index 0000000..6b5c776
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -0,0 +1,580 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
+import org.apache.ignite.igfs.IgfsException;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
+import org.apache.ignite.igfs.IgfsPathNotFoundException;
+import org.apache.ignite.igfs.IgfsUserContext;
+import 
org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Secondary file system which delegates calls to an instance of Hadoop {@link 
FileSystem}.
+ * <p>
+ * Target {@code FileSystem}'s are created on per-user basis using passed 
{@link HadoopFileSystemFactory}.
+ */
+public class IgniteHadoopIgfsSecondaryFileSystem implements 
IgfsSecondaryFileSystemV2, LifecycleAware,
+    HadoopPayloadAware {
+    /** The default user name. It is used if no user context is set. */
+    private String dfltUsrName;
+
+    /** Factory. */
+    private HadoopFileSystemFactory fsFactory;
+
+    /**
+     * Default constructor for Spring.
+     */
+    public IgniteHadoopIgfsSecondaryFileSystem() {
+        // No-op.
+    }
+
+    /**
+     * Simple constructor that is to be used by default.
+     *
+     * @param uri URI of file system.
+     * @throws IgniteCheckedException In case of error.
+     * @deprecated Use {@link #getFileSystemFactory()} instead.
+     */
+    @Deprecated
+    public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws 
IgniteCheckedException {
+        this(uri, null, null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param uri URI of file system.
+     * @param cfgPath Additional path to Hadoop configuration.
+     * @throws IgniteCheckedException In case of error.
+     * @deprecated Use {@link #getFileSystemFactory()} instead.
+     */
+    @Deprecated
+    public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable 
String cfgPath)
+        throws IgniteCheckedException {
+        this(uri, cfgPath, null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param uri URI of file system.
+     * @param cfgPath Additional path to Hadoop configuration.
+     * @param userName User name.
+     * @throws IgniteCheckedException In case of error.
+     * @deprecated Use {@link #getFileSystemFactory()} instead.
+     */
+    @Deprecated
+    public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable 
String cfgPath,
+        @Nullable String userName) throws IgniteCheckedException {
+        setDefaultUserName(userName);
+
+        CachingHadoopFileSystemFactory fac = new 
CachingHadoopFileSystemFactory();
+
+        fac.setUri(uri);
+
+        if (cfgPath != null)
+            fac.setConfigPaths(cfgPath);
+
+        setFileSystemFactory(fac);
+    }
+
+    /**
+     * Gets default user name.
+     * <p>
+     * Defines user name which will be used during file system invocation in 
case no user name is defined explicitly
+     * through {@link FileSystem#get(URI, Configuration, String)}.
+     * <p>
+     * Also this name will be used if you manipulate {@link IgniteFileSystem} 
directly and do not set user name
+     * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} 
or
+     * {@link IgfsUserContext#doAs(String, Callable)} methods.
+     * <p>
+     * If not set value of system property {@code "user.name"} will be used. 
If this property is not set either,
+     * {@code "anonymous"} will be used.
+     *
+     * @return Default user name.
+     */
+    @Nullable public String getDefaultUserName() {
+        return dfltUsrName;
+    }
+
+    /**
+     * Sets default user name. See {@link #getDefaultUserName()} for details.
+     *
+     * @param dfltUsrName Default user name.
+     */
+    public void setDefaultUserName(@Nullable String dfltUsrName) {
+        this.dfltUsrName = dfltUsrName;
+    }
+
+    /**
+     * Gets secondary file system factory.
+     * <p>
+     * This factory will be used whenever a call to a target {@link 
FileSystem} is required.
+     * <p>
+     * If not set, {@link CachingHadoopFileSystemFactory} will be used.
+     *
+     * @return Secondary file system factory.
+     */
+    public HadoopFileSystemFactory getFileSystemFactory() {
+        return fsFactory;
+    }
+
+    /**
+     * Sets secondary file system factory. See {@link #getFileSystemFactory()} 
for details.
+     *
+     * @param factory Secondary file system factory.
+     */
+    public void setFileSystemFactory(HadoopFileSystemFactory factory) {
+        this.fsFactory = factory;
+    }
+
+    /**
+     * Convert IGFS path into Hadoop path.
+     *
+     * @param path IGFS path.
+     * @return Hadoop path.
+     */
+    private Path convert(IgfsPath path) {
+        URI uri = fileSystemForUser().getUri();
+
+        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
+    }
+
+    /**
+     * Heuristically checks if exception was caused by invalid HDFS version 
and returns appropriate exception.
+     *
+     * @param e Exception to check.
+     * @param detailMsg Detailed error message.
+     * @return Appropriate exception.
+     */
+    private IgfsException handleSecondaryFsError(IOException e, String 
detailMsg) {
+        return cast(detailMsg, e);
+    }
+
+    /**
+     * Cast IO exception to IGFS exception.
+     *
+     * @param e IO exception.
+     * @return IGFS exception.
+     */
+    public static IgfsException cast(String msg, IOException e) {
+        if (e instanceof FileNotFoundException)
+            return new IgfsPathNotFoundException(e);
+        else if (e instanceof ParentNotDirectoryException)
+            return new IgfsParentNotDirectoryException(msg, e);
+        else if (e instanceof PathIsNotEmptyDirectoryException)
+            return new IgfsDirectoryNotEmptyException(e);
+        else if (e instanceof PathExistsException)
+            return new IgfsPathAlreadyExistsException(msg, e);
+        else
+            return new IgfsException(msg, e);
+    }
+
+    /**
+     * Convert Hadoop FileStatus properties to map.
+     *
+     * @param status File status.
+     * @return IGFS attributes.
+     */
+    private static Map<String, String> properties(FileStatus status) {
+        FsPermission perm = status.getPermission();
+
+        if (perm == null)
+            perm = FsPermission.getDefault();
+
+        HashMap<String, String> res = new HashMap<>(3);
+
+        res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", 
perm.toShort()));
+        res.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
+        res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(IgfsPath path) {
+        try {
+            return fileSystemForUser().exists(convert(path));
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to check file existence 
[path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, 
String> props) {
+        HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
+
+        final FileSystem fileSys = fileSystemForUser();
+
+        try {
+            if (props0.userName() != null || props0.groupName() != null)
+                fileSys.setOwner(convert(path), props0.userName(), 
props0.groupName());
+
+            if (props0.permission() != null)
+                fileSys.setPermission(convert(path), props0.permission());
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to update file properties 
[path=" + path + "]");
+        }
+
+        //Result is not used in case of secondary FS.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(IgfsPath src, IgfsPath dest) {
+        // Delegate to the secondary file system.
+        try {
+            if (!fileSystemForUser().rename(convert(src), convert(dest)))
+                throw new IgfsException("Failed to rename (secondary file 
system returned false) " +
+                    "[src=" + src + ", dest=" + dest + ']');
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to rename file [src=" + 
src + ", dest=" + dest + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(IgfsPath path, boolean recursive) {
+        try {
+            return fileSystemForUser().delete(convert(path), recursive);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to delete file [path=" + 
path + ", recursive=" + recursive + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path) {
+        try {
+            if (!fileSystemForUser().mkdirs(convert(path)))
+                throw new IgniteException("Failed to make directories [path=" 
+ path + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories 
[path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> 
props) {
+        try {
+            if (!fileSystemForUser().mkdirs(convert(path), new 
HadoopIgfsProperties(props).permission()))
+                throw new IgniteException("Failed to make directories [path=" 
+ path + ", props=" + props + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories 
[path=" + path + ", props=" + props + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+        try {
+            FileStatus[] statuses = 
fileSystemForUser().listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsPathNotFoundException("Failed to list files 
(path not found): " + path);
+
+            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus status : statuses)
+                res.add(new IgfsPath(path, status.getPath().getName()));
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsPathNotFoundException("Failed to list files (path 
not found): " + path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to 
secondary file system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+        try {
+            FileStatus[] statuses = 
fileSystemForUser().listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsPathNotFoundException("Failed to list files 
(path not found): " + path);
+
+            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus s : statuses) {
+                IgfsEntryInfo fsInfo = s.isDirectory() ?
+                    IgfsUtils.createDirectory(
+                        IgniteUuid.randomUuid(),
+                        null,
+                        properties(s),
+                        s.getAccessTime(),
+                        s.getModificationTime()
+                    ) :
+                    IgfsUtils.createFile(
+                        IgniteUuid.randomUuid(),
+                        (int)s.getBlockSize(),
+                        s.getLen(),
+                        null,
+                        null,
+                        false,
+                        properties(s),
+                        s.getAccessTime(),
+                        s.getModificationTime()
+                    );
+
+                res.add(new IgfsFileImpl(new IgfsPath(path, 
s.getPath().getName()), fsInfo, 1));
+            }
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsPathNotFoundException("Failed to list files (path 
not found): " + path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to 
secondary file system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath 
path, int bufSize) {
+        return new 
HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), 
convert(path), bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, boolean overwrite) {
+        try {
+            return fileSystemForUser().create(convert(path), overwrite);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + 
path + ", overwrite=" + overwrite + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, int bufSize, boolean 
overwrite, int replication,
+        long blockSize, @Nullable Map<String, String> props) {
+        HadoopIgfsProperties props0 =
+            new HadoopIgfsProperties(props != null ? props : 
Collections.<String, String>emptyMap());
+
+        try {
+            return fileSystemForUser().create(convert(path), 
props0.permission(), overwrite, bufSize,
+                (short) replication, blockSize, null);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + 
path + ", props=" + props +
+                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", 
replication=" + replication +
+                ", blockSize=" + blockSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream append(IgfsPath path, int bufSize, boolean 
create,
+        @Nullable Map<String, String> props) {
+        try {
+            return fileSystemForUser().append(convert(path), bufSize);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to append file [path=" + 
path + ", bufSize=" + bufSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) {
+        try {
+            final FileStatus status = 
fileSystemForUser().getFileStatus(convert(path));
+
+            if (status == null)
+                return null;
+
+            final Map<String, String> props = properties(status);
+
+            return new IgfsFile() {
+                @Override public IgfsPath path() {
+                    return path;
+                }
+
+                @Override public boolean isFile() {
+                    return status.isFile();
+                }
+
+                @Override public boolean isDirectory() {
+                    return status.isDirectory();
+                }
+
+                @Override public int blockSize() {
+                    // By convention directory has blockSize == 0, while file 
has blockSize > 0:
+                    return isDirectory() ? 0 : (int)status.getBlockSize();
+                }
+
+                @Override public long groupBlockSize() {
+                    return status.getBlockSize();
+                }
+
+                @Override public long accessTime() {
+                    return status.getAccessTime();
+                }
+
+                @Override public long modificationTime() {
+                    return status.getModificationTime();
+                }
+
+                @Override public String property(String name) throws 
IllegalArgumentException {
+                    String val = props.get(name);
+
+                    if (val ==  null)
+                        throw new IllegalArgumentException("File property not 
found [path=" + path + ", name=" + name + ']');
+
+                    return val;
+                }
+
+                @Nullable @Override public String property(String name, 
@Nullable String dfltVal) {
+                    String val = props.get(name);
+
+                    return val == null ? dfltVal : val;
+                }
+
+                @Override public long length() {
+                    return status.getLen();
+                }
+
+                /** {@inheritDoc} */
+                @Override public Map<String, String> properties() {
+                    return props;
+                }
+            };
+        }
+        catch (FileNotFoundException ignore) {
+            return null;
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get file status [path=" 
+ path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long usedSpaceSize() {
+        try {
+            // We don't use FileSystem#getUsed() since it counts only the files
+            // in the filesystem root, not all the files recursively.
+            return fileSystemForUser().getContentSummary(new 
Path("/")).getSpaceConsumed();
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get used space size of 
file system.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTimes(IgfsPath path, long accessTime, long 
modificationTime) throws IgniteException {
+        try {
+            // We don't use FileSystem#getUsed() since it counts only the files
+            // in the filesystem root, not all the files recursively.
+            fileSystemForUser().setTimes(convert(path), modificationTime, 
accessTime);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed set times for path: " + 
path);
+        }
+    }
+
+    /**
+     * Gets the underlying {@link FileSystem}.
+     * This method is used solely for testing.
+     * @return the underlying Hadoop {@link FileSystem}.
+     */
+    public FileSystem fileSystem() {
+        return fileSystemForUser();
+    }
+
+    /**
+     * Gets the FileSystem for the current context user.
+     * @return the FileSystem instance, never null.
+     */
+    private FileSystem fileSystemForUser() {
+        String user = IgfsUserContext.currentUser();
+
+        if (F.isEmpty(user))
+            user = IgfsUtils.fixUserName(dfltUsrName);
+
+        assert !F.isEmpty(user);
+
+        try {
+            return fsFactory.get(user);
+        }
+        catch (IOException ioe) {
+            throw new IgniteException(ioe);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        dfltUsrName = IgfsUtils.fixUserName(dfltUsrName);
+
+        if (fsFactory == null)
+            fsFactory = new CachingHadoopFileSystemFactory();
+
+        if (fsFactory instanceof LifecycleAware)
+            ((LifecycleAware) fsFactory).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        if (fsFactory instanceof LifecycleAware)
+            ((LifecycleAware)fsFactory).stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopFileSystemFactory getPayload() {
+        return fsFactory;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
new file mode 100644
index 0000000..bbfbc59
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Secure Hadoop file system factory that can work with underlying file system 
protected with Kerberos.
+ * It uses "impersonation" mechanism, to be able to work on behalf of 
arbitrary client user.
+ * Please see 
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
 for details.
+ * The principal and the key tab name to be used for Kerberos authentication 
are set explicitly
+ * in the factory configuration.
+ *
+ * <p>This factory does not cache any file system instances. If {@code 
"fs.[prefix].impl.disable.cache"} is set
+ * to {@code true}, file system instances will be cached by Hadoop.
+ */
+public class KerberosHadoopFileSystemFactory extends 
BasicHadoopFileSystemFactory {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The default interval used to re-login from the key tab, in 
milliseconds. */
+    public static final long DFLT_RELOGIN_INTERVAL = 10 * 60 * 1000L;
+
+    /** Keytab full file name. */
+    private String keyTab;
+
+    /** Keytab principal. */
+    private String keyTabPrincipal;
+
+    /** The re-login interval. See {@link #getReloginInterval()} for more 
information. */
+    private long reloginInterval = DFLT_RELOGIN_INTERVAL;
+
+    /** Time of last re-login attempt, in system milliseconds. */
+    private transient volatile long lastReloginTime;
+
+    /**
+     * Constructor.
+     */
+    public KerberosHadoopFileSystemFactory() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystem getWithMappedName(String name) throws 
IOException {
+        reloginIfNeeded();
+
+        return super.getWithMappedName(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FileSystem create(String usrName) throws IOException, 
InterruptedException {
+        UserGroupInformation proxyUgi = 
UserGroupInformation.createProxyUser(usrName,
+            UserGroupInformation.getLoginUser());
+
+        return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+            @Override public FileSystem run() throws Exception {
+                return FileSystem.get(fullUri, cfg);
+            }
+        });
+    }
+
+    /**
+     * Gets the key tab principal short name (e.g. "hdfs").
+     *
+     * @return The key tab principal.
+     */
+    @Nullable public String getKeyTabPrincipal() {
+        return keyTabPrincipal;
+    }
+
+    /**
+     * Set the key tab principal name. See {@link #getKeyTabPrincipal()} for 
more information.
+     *
+     * @param keyTabPrincipal The key tab principal name.
+     */
+    public void setKeyTabPrincipal(@Nullable String keyTabPrincipal) {
+        this.keyTabPrincipal = keyTabPrincipal;
+    }
+
+    /**
+     * Gets the key tab full file name (e.g. 
"/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
+     * <p>
+     * <b>NOTE!</b> Factory can be serialized and transferred to other 
machines where instance of
+     * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist 
on these machines as well.
+     *
+     * @return The key tab file name.
+     */
+    @Nullable public String getKeyTab() {
+        return keyTab;
+    }
+
+    /**
+     * Sets the key tab file name. See {@link #getKeyTab()} for more 
information.
+     *
+     * @param keyTab The key tab file name.
+     */
+    public void setKeyTab(@Nullable String keyTab) {
+        this.keyTab = keyTab;
+    }
+
+    /**
+     * The interval used to re-login from the key tab, in milliseconds.
+     * Important that the value should not be larger than the Kerberos ticket 
life time multiplied by 0.2. This is
+     * because the ticket renew window starts from {@code 0.8 * ticket life 
time}.
+     * Default ticket life time is 1 day (24 hours), so the default re-login 
interval (10 min)
+     * is obeys this rule well.
+     *
+     * <p>Zero value means that re-login should be attempted on each file 
system operation.
+     * Negative values are not allowed.
+     *
+     * <p>Note, however, that it does not make sense to make this value small, 
because Hadoop does not allow to
+     * login if less than {@link 
org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} 
milliseconds
+     * have passed since the time of the previous login.
+     * See {@link 
org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} 
and its usages for
+     * more detail.
+     *
+     * @return The re-login interval, in milliseconds.
+     */
+    public long getReloginInterval() {
+        return reloginInterval;
+    }
+
+    /**
+     * Sets the relogin interval in milliseconds. See {@link 
#getReloginInterval()} for more information.
+     *
+     * @param reloginInterval The re-login interval, in milliseconds.
+     */
+    public void setReloginInterval(long reloginInterval) {
+        this.reloginInterval = reloginInterval;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
+        A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be 
empty.");
+        A.ensure(reloginInterval >= 0, "reloginInterval cannot not be 
negative.");
+
+        super.start();
+
+        try {
+            UserGroupInformation.setConfiguration(cfg);
+            UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
+        }
+        catch (IOException ioe) {
+            throw new IgniteException("Failed login from keytab [keyTab=" + 
keyTab +
+                ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
+        }
+    }
+
+    /**
+     * Re-logins the user if needed.
+     * First, the re-login interval defined in factory is checked. The 
re-login attempts will be not more
+     * frequent than one attempt per {@code reloginInterval}.
+     * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} 
method invoked that gets existing
+     * TGT and checks its validity. If the TGT is expired or is close to 
expiry, it performs re-login.
+     *
+     * <p>This operation expected to be called upon each operation with the 
file system created with the factory.
+     * As long as {@link #get(String)} operation is invoked upon each file 
{@link IgniteHadoopFileSystem}, there
+     * is no need to invoke it otherwise specially.
+     *
+     * @throws IOException If login fails.
+     */
+    private void reloginIfNeeded() throws IOException {
+        long now = System.currentTimeMillis();
+
+        if (now >= lastReloginTime + reloginInterval) {
+            UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+
+            lastReloginTime = now;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        U.writeString(out, keyTab);
+        U.writeString(out, keyTabPrincipal);
+        out.writeLong(reloginInterval);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        super.readExternal(in);
+
+        keyTab = U.readString(in);
+        keyTabPrincipal = U.readString(in);
+        reloginInterval = in.readLong();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java
new file mode 100644
index 0000000..164801f
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Ignite Hadoop Accelerator file system API.
+ */
+package org.apache.ignite.hadoop.fs;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
new file mode 100644
index 0000000..a06129e
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -0,0 +1,1364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs.v1;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
+import 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
+import org.apache.ignite.internal.processors.igfs.IgfsPaths;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
+import static 
org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static 
org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
+
+/**
+ * {@code IGFS} Hadoop 1.x file system driver over file system API. To use
+ * {@code IGFS} as Hadoop file system, you should configure this class
+ * in Hadoop's {@code core-site.xml} as follows:
+ * <pre name="code" class="xml">
+ *  &lt;property&gt;
+ *      &lt;name&gt;fs.default.name&lt;/name&gt;
+ *      &lt;value&gt;igfs:///&lt;/value&gt;
+ *  &lt;/property&gt;
+ *
+ *  &lt;property&gt;
+ *      &lt;name&gt;fs.igfs.impl&lt;/name&gt;
+ *      
&lt;value&gt;org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem&lt;/value&gt;
+ *  &lt;/property&gt;
+ * </pre>
+ * You should also add Ignite JAR and all libraries to Hadoop classpath. To
+ * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
+ * distribution:
+ * <pre name="code" class="bash">
+ * export IGNITE_HOME=/path/to/Ignite/distribution
+ * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
+ *
+ * for f in $IGNITE_HOME/libs/*.jar; do
+ *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
+ * done
+ * </pre>
+ * <h1 class="header">Data vs Clients Nodes</h1>
+ * Hadoop needs to use its FileSystem remotely from client nodes as well as 
directly on
+ * data nodes. Client nodes are responsible for basic file system operations 
as well as
+ * accessing data nodes remotely. Usually, client nodes are started together
+ * with {@code job-submitter} or {@code job-scheduler} processes, while data 
nodes are usually
+ * started together with Hadoop {@code task-tracker} processes.
+ * <p>
+ * For sample client and data node configuration refer to {@code 
config/hadoop/default-config-client.xml}
+ * and {@code config/hadoop/default-config.xml} configuration files in Ignite 
installation.
+ */
+public class IgniteHadoopFileSystem extends FileSystem {
+    /** Internal property to indicate management connection. */
+    public static final String IGFS_MANAGEMENT = 
"fs.igfs.management.connection";
+
+    /** Empty array of file block locations. */
+    private static final BlockLocation[] EMPTY_BLOCK_LOCATIONS = new 
BlockLocation[0];
+
+    /** Empty array of file statuses. */
+    public static final FileStatus[] EMPTY_FILE_STATUS = new FileStatus[0];
+
+    /** Ensures that close routine is invoked at most once. */
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Grid remote client. */
+    private HadoopIgfsWrapper rmtClient;
+
+    /** working directory. */
+    private Path workingDir;
+
+    /** Default replication factor. */
+    private short dfltReplication;
+
+    /** Base file system uri. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private URI uri;
+
+    /** Authority. */
+    private String uriAuthority;
+
+    /** Client logger. */
+    private IgfsLogger clientLog;
+
+    /** Secondary URI string. */
+    private URI secondaryUri;
+
+    /** The user name this file system was created on behalf of. */
+    private String user;
+
+    /** IGFS mode resolver. */
+    private IgfsModeResolver modeRslvr;
+
+    /** The secondary file system factory. */
+    private HadoopFileSystemFactory factory;
+
+    /** Management connection flag. */
+    private boolean mgmt;
+
+    /** Whether custom sequential reads before prefetch value is provided. */
+    private boolean seqReadsBeforePrefetchOverride;
+
+    /** IGFS group block size. */
+    private long igfsGrpBlockSize;
+
+    /** Flag that controls whether file writes should be colocated. */
+    private boolean colocateFileWrites;
+
+    /** Prefer local writes. */
+    private boolean preferLocFileWrites;
+
+    /** Custom-provided sequential reads before prefetch. */
+    private int seqReadsBeforePrefetch;
+
+    /** {@inheritDoc} */
+    @Override public URI getUri() {
+        if (uri == null)
+            throw new IllegalStateException("URI is null (was 
IgniteHadoopFileSystem properly initialized?).");
+
+        return uri;
+    }
+
+    /**
+     * Enter busy state.
+     *
+     * @throws IOException If file system is stopped.
+     */
+    private void enterBusy() throws IOException {
+        if (closeGuard.get())
+            throw new IOException("File system is stopped.");
+    }
+
+    /**
+     * Leave busy state.
+     */
+    private void leaveBusy() {
+        // No-op.
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop file system viewpoint.
+     * @return the user name, never null.
+     */
+    public static String getFsHadoopUser() throws IOException {
+        UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+        String user = currUgi.getShortUserName();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        return user;
+    }
+
+    /**
+     * Public setter that can be used by direct users of FS or Visor.
+     *
+     * @param colocateFileWrites Whether all ongoing file writes should be 
colocated.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public void colocateFileWrites(boolean colocateFileWrites) {
+        this.colocateFileWrites = colocateFileWrites;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public void initialize(URI name, Configuration cfg) throws 
IOException {
+        enterBusy();
+
+        try {
+            if (rmtClient != null)
+                throw new IOException("File system is already initialized: " + 
rmtClient);
+
+            A.notNull(name, "name");
+            A.notNull(cfg, "cfg");
+
+            super.initialize(name, cfg);
+
+            setConf(cfg);
+
+            mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
+
+            if (!IGFS_SCHEME.equals(name.getScheme()))
+                throw new IOException("Illegal file system URI [expected=" + 
IGFS_SCHEME +
+                    "://[name]/[optional_path], actual=" + name + ']');
+
+            uri = name;
+
+            uriAuthority = uri.getAuthority();
+
+            user = getFsHadoopUser();
+
+            // Override sequential reads before prefetch if needed.
+            seqReadsBeforePrefetch = parameter(cfg, 
PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
+
+            if (seqReadsBeforePrefetch > 0)
+                seqReadsBeforePrefetchOverride = true;
+
+            // In Ignite replication factor is controlled by data cache 
affinity.
+            // We use replication factor to force the whole file to be stored 
on local node.
+            dfltReplication = (short)cfg.getInt("dfs.replication", 3);
+
+            // Get file colocation control flag.
+            colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, 
uriAuthority, false);
+            preferLocFileWrites = 
cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
+
+            // Get log directory.
+            String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, 
uriAuthority, DFLT_IGFS_LOG_DIR);
+
+            File logDirFile = U.resolveIgnitePath(logDirCfg);
+
+            String logDir = logDirFile != null ? logDirFile.getAbsolutePath() 
: null;
+
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, 
user);
+
+            // Handshake.
+            IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
+
+            igfsGrpBlockSize = handshake.blockSize();
+
+            IgfsPaths paths = handshake.secondaryPaths();
+
+            // Initialize client logger.
+            Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, 
uriAuthority, false);
+
+            if (handshake.sampling() != null ? handshake.sampling() : 
logEnabled) {
+                // Initiate client logger.
+                if (logDir == null)
+                    throw new IOException("Failed to resolve log directory: " 
+ logDirCfg);
+
+                Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, 
uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
+
+                clientLog = IgfsLogger.logger(uriAuthority, 
handshake.igfsName(), logDir, batchSize);
+            }
+            else
+                clientLog = IgfsLogger.disabledLogger();
+
+            try {
+                modeRslvr = new IgfsModeResolver(paths.defaultMode(), 
paths.pathModes());
+            }
+            catch (IgniteCheckedException ice) {
+                throw new IOException(ice);
+            }
+
+            boolean initSecondary = paths.defaultMode() == PROXY;
+
+            if (!initSecondary && paths.pathModes() != null && 
!paths.pathModes().isEmpty()) {
+                for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
+                    IgfsMode mode = pathMode.getValue();
+
+                    if (mode == PROXY) {
+                        initSecondary = true;
+
+                        break;
+                    }
+                }
+            }
+
+            if (initSecondary) {
+                try {
+                    factory = (HadoopFileSystemFactory) 
paths.getPayload(getClass().getClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to get secondary file system 
factory.", e);
+                }
+
+                if (factory == null)
+                    throw new IOException("Failed to get secondary file system 
factory (did you set " +
+                        IgniteHadoopIgfsSecondaryFileSystem.class.getName() + 
" as \"secondaryFIleSystem\" in " +
+                        FileSystemConfiguration.class.getName() + "?)");
+
+                if (factory instanceof LifecycleAware)
+                    ((LifecycleAware) factory).start();
+
+                try {
+                    FileSystem secFs = factory.get(user);
+
+                    secondaryUri = secFs.getUri();
+
+                    A.ensure(secondaryUri != null, "Secondary file system uri 
should not be null.");
+                }
+                catch (IOException e) {
+                    if (!mgmt)
+                        throw new IOException("Failed to connect to the 
secondary file system: " + secondaryUri, e);
+                    else
+                        LOG.warn("Visor failed to create secondary file system 
(operations on paths with PROXY mode " +
+                            "will have no effect): " + e.getMessage());
+                }
+            }
+
+            // set working directory to the home directory of the current Fs 
user:
+            setWorkingDirectory(null);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPath(Path path) {
+        URI uri = path.toUri();
+
+        if (uri.isAbsolute()) {
+            if (!F.eq(uri.getScheme(), IGFS_SCHEME))
+                throw new InvalidPathException("Wrong path scheme [expected=" 
+ IGFS_SCHEME + ", actual=" +
+                    uri.getAuthority() + ']');
+
+            if (!F.eq(uri.getAuthority(), uriAuthority))
+                throw new InvalidPathException("Wrong path authority 
[expected=" + uriAuthority + ", actual=" +
+                    uri.getAuthority() + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public short getDefaultReplication() {
+        return dfltReplication;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void finalize() throws Throwable {
+        super.finalize();
+
+        close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        if (closeGuard.compareAndSet(false, true))
+            close0();
+    }
+
+    /**
+     * Closes file system.
+     *
+     * @throws IOException If failed.
+     */
+    private void close0() throws IOException {
+        if (LOG.isDebugEnabled())
+            LOG.debug("File system closed [uri=" + uri + ", endpoint=" + 
uriAuthority + ']');
+
+        if (rmtClient == null)
+            return;
+
+        super.close();
+
+        rmtClient.close(false);
+
+        if (clientLog.isLogEnabled())
+            clientLog.close();
+
+        if (factory instanceof LifecycleAware)
+            ((LifecycleAware) factory).stop();
+
+        // Reset initialized resources.
+        uri = null;
+        rmtClient = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTimes(Path p, long mtime, long atime) throws 
IOException {
+        enterBusy();
+
+        try {
+            A.notNull(p, "p");
+
+            if (mode(p) == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    // No-op for management connection.
+                    return;
+                }
+
+                secondaryFs.setTimes(toSecondary(p), mtime, atime);
+            }
+            else {
+                IgfsPath path = convert(p);
+
+                rmtClient.setTimes(path, atime, mtime);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPermission(Path p, FsPermission perm) throws 
IOException {
+        enterBusy();
+
+        try {
+            A.notNull(p, "p");
+
+            if (mode(p) == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    // No-op for management connection.
+                    return;
+                }
+
+                secondaryFs.setPermission(toSecondary(p), perm);
+            }
+            else if (rmtClient.update(convert(p), permission(perm)) == null) {
+                throw new IOException("Failed to set file permission (file not 
found?)" +
+                    " [path=" + p + ", perm=" + perm + ']');
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setOwner(Path p, String username, String grpName) 
throws IOException {
+        A.notNull(p, "p");
+        A.notNull(username, "username");
+        A.notNull(grpName, "grpName");
+
+        enterBusy();
+
+        try {
+            if (mode(p) == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    // No-op for management connection.
+                    return;
+                }
+
+                secondaryFs.setOwner(toSecondary(p), username, grpName);
+            }
+            else if (rmtClient.update(convert(p), 
F.asMap(IgfsUtils.PROP_USER_NAME, username,
+                IgfsUtils.PROP_GROUP_NAME, grpName)) == null) {
+                throw new IOException("Failed to set file permission (file not 
found?)" +
+                    " [path=" + p + ", userName=" + username + ", groupName=" 
+ grpName + ']');
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataInputStream open(Path f, int bufSize) throws 
IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = mode(path);
+
+            if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    throw new IOException("Failed to open file (secondary file 
system is not initialized): " + f);
+                }
+
+                FSDataInputStream is = secondaryFs.open(toSecondary(f), 
bufSize);
+
+                if (clientLog.isLogEnabled()) {
+                    // At this point we do not know file size, so we perform 
additional request to remote FS to get it.
+                    FileStatus status = 
secondaryFs.getFileStatus(toSecondary(f));
+
+                    long size = status != null ? status.getLen() : -1;
+
+                    long logId = IgfsLogger.nextId();
+
+                    clientLog.logOpen(logId, path, PROXY, bufSize, size);
+
+                    return new FSDataInputStream(new 
HadoopIgfsProxyInputStream(is, clientLog, logId));
+                }
+                else
+                    return is;
+            }
+            else {
+                HadoopIgfsStreamDelegate stream = 
seqReadsBeforePrefetchOverride ?
+                    rmtClient.open(path, seqReadsBeforePrefetch) : 
rmtClient.open(path);
+
+                long logId = -1;
+
+                if (clientLog.isLogEnabled()) {
+                    logId = IgfsLogger.nextId();
+
+                    clientLog.logOpen(logId, path, mode, bufSize, 
stream.length());
+                }
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Opening input stream [thread=" + 
Thread.currentThread().getName() + ", path=" + path +
+                        ", bufSize=" + bufSize + ']');
+
+                HadoopIgfsInputStream igfsIn = new 
HadoopIgfsInputStream(stream, stream.length(),
+                    bufSize, LOG, clientLog, logId);
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Opened input stream [path=" + path + ", 
delegate=" + stream + ']');
+
+                return new FSDataInputStream(igfsIn);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public FSDataOutputStream create(Path f, final FsPermission 
perm, boolean overwrite, int bufSize,
+        short replication, long blockSize, Progressable progress) throws 
IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        OutputStream out = null;
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = mode(path);
+
+            if (LOG.isDebugEnabled())
+                LOG.debug("Opening output stream in create [thread=" + 
Thread.currentThread().getName() + "path=" +
+                    path + ", overwrite=" + overwrite + ", bufSize=" + bufSize 
+ ']');
+
+            if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    throw new IOException("Failed to create file (secondary 
file system is not initialized): " + f);
+                }
+
+                FSDataOutputStream os =
+                    secondaryFs.create(toSecondary(f), perm, overwrite, 
bufSize, replication, blockSize, progress);
+
+                if (clientLog.isLogEnabled()) {
+                    long logId = IgfsLogger.nextId();
+
+                    clientLog.logCreate(logId, path, PROXY, overwrite, 
bufSize, replication, blockSize);
+
+                    return new FSDataOutputStream(new 
HadoopIgfsProxyOutputStream(os, clientLog, logId));
+                }
+                else
+                    return os;
+            }
+            else {
+                Map<String,String> propMap = permission(perm);
+
+                propMap.put(IgfsUtils.PROP_PREFER_LOCAL_WRITES, 
Boolean.toString(preferLocFileWrites));
+
+                // Create stream and close it in the 'finally' section if any 
sequential operation failed.
+                HadoopIgfsStreamDelegate stream = rmtClient.create(path, 
overwrite, colocateFileWrites,
+                    replication, blockSize, propMap);
+
+                assert stream != null;
+
+                long logId = -1;
+
+                if (clientLog.isLogEnabled()) {
+                    logId = IgfsLogger.nextId();
+
+                    clientLog.logCreate(logId, path, mode, overwrite, bufSize, 
replication, blockSize);
+                }
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Opened output stream in create [path=" + path + 
", delegate=" + stream + ']');
+
+                HadoopIgfsOutputStream igfsOut = new 
HadoopIgfsOutputStream(stream, LOG, clientLog,
+                    logId);
+
+                bufSize = Math.max(64 * 1024, bufSize);
+
+                out = new BufferedOutputStream(igfsOut, bufSize);
+
+                FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
+
+                // Mark stream created successfully.
+                out = null;
+
+                return res;
+            }
+        }
+        finally {
+            // Close if failed during stream creation.
+            if (out != null)
+                U.closeQuiet(out);
+
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public FSDataOutputStream append(Path f, int bufSize, 
Progressable progress) throws IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = mode(path);
+
+            if (LOG.isDebugEnabled())
+                LOG.debug("Opening output stream in append [thread=" + 
Thread.currentThread().getName() +
+                    ", path=" + path + ", bufSize=" + bufSize + ']');
+
+            if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    throw new IOException("Failed to append file (secondary 
file system is not initialized): " + f);
+                }
+
+                FSDataOutputStream os = secondaryFs.append(toSecondary(f), 
bufSize, progress);
+
+                if (clientLog.isLogEnabled()) {
+                    long logId = IgfsLogger.nextId();
+
+                    clientLog.logAppend(logId, path, PROXY, bufSize); // Don't 
have stream ID.
+
+                    return new FSDataOutputStream(new 
HadoopIgfsProxyOutputStream(os, clientLog, logId));
+                }
+                else
+                    return os;
+            }
+            else {
+                HadoopIgfsStreamDelegate stream = rmtClient.append(path, 
false, null);
+
+                assert stream != null;
+
+                long logId = -1;
+
+                if (clientLog.isLogEnabled()) {
+                    logId = IgfsLogger.nextId();
+
+                    clientLog.logAppend(logId, path, mode, bufSize);
+                }
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Opened output stream in append [path=" + path + 
", delegate=" + stream + ']');
+
+                HadoopIgfsOutputStream igfsOut = new 
HadoopIgfsOutputStream(stream, LOG, clientLog,
+                    logId);
+
+                bufSize = Math.max(64 * 1024, bufSize);
+
+                BufferedOutputStream out = new BufferedOutputStream(igfsOut, 
bufSize);
+
+                return new FSDataOutputStream(out, null, 0);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public boolean rename(Path src, Path dst) throws IOException {
+        A.notNull(src, "src");
+        A.notNull(dst, "dst");
+
+        enterBusy();
+
+        try {
+            IgfsPath srcPath = convert(src);
+            IgfsPath dstPath = convert(dst);
+            IgfsMode mode = mode(srcPath);
+
+            if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    return false;
+                }
+
+                if (clientLog.isLogEnabled())
+                    clientLog.logRename(srcPath, PROXY, dstPath);
+
+                return secondaryFs.rename(toSecondary(src), toSecondary(dst));
+            }
+            else {
+                if (clientLog.isLogEnabled())
+                    clientLog.logRename(srcPath, mode, dstPath);
+
+                try {
+                    rmtClient.rename(srcPath, dstPath);
+                }
+                catch (IOException ioe) {
+                    // Log the exception before rethrowing since it may be 
ignored:
+                    LOG.warn("Failed to rename [srcPath=" + srcPath + ", 
dstPath=" + dstPath + ", mode=" + mode + ']',
+                        ioe);
+
+                    throw ioe;
+                }
+
+                return true;
+            }
+        }
+        catch (IOException e) {
+            // Intentionally ignore IGFS exceptions here to follow Hadoop 
contract.
+            if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null 
||
+                !X.hasCause(e.getCause(), IgfsException.class)))
+                throw e;
+            else
+                return false;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public boolean delete(Path f) throws IOException {
+        return delete(f, false);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public boolean delete(Path f, boolean recursive) throws 
IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = mode(path);
+
+            if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    return false;
+                }
+
+                if (clientLog.isLogEnabled())
+                    clientLog.logDelete(path, PROXY, recursive);
+
+                return secondaryFs.delete(toSecondary(f), recursive);
+            }
+            else {
+                // Will throw exception if delete failed.
+                boolean res = rmtClient.delete(path, recursive);
+
+                if (clientLog.isLogEnabled())
+                    clientLog.logDelete(path, mode, recursive);
+
+                return res;
+            }
+        }
+        catch (IOException e) {
+            // Intentionally ignore IGFS exceptions here to follow Hadoop 
contract.
+            if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null 
||
+                !X.hasCause(e.getCause(), IgfsException.class)))
+                throw e;
+            else
+                return false;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus[] listStatus(Path f) throws IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = mode(path);
+
+            if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    return EMPTY_FILE_STATUS;
+                }
+
+                FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
+
+                if (arr == null)
+                    throw new FileNotFoundException("File " + f + " does not 
exist.");
+
+                for (int i = 0; i < arr.length; i++)
+                    arr[i] = toPrimary(arr[i]);
+
+                if (clientLog.isLogEnabled()) {
+                    String[] fileArr = new String[arr.length];
+
+                    for (int i = 0; i < arr.length; i++)
+                        fileArr[i] = arr[i].getPath().toString();
+
+                    clientLog.logListDirectory(path, PROXY, fileArr);
+                }
+
+                return arr;
+            }
+            else {
+                Collection<IgfsFile> list = rmtClient.listFiles(path);
+
+                if (list == null)
+                    throw new FileNotFoundException("File " + f + " does not 
exist.");
+
+                List<IgfsFile> files = new ArrayList<>(list);
+
+                FileStatus[] arr = new FileStatus[files.size()];
+
+                for (int i = 0; i < arr.length; i++)
+                    arr[i] = convert(files.get(i));
+
+                if (clientLog.isLogEnabled()) {
+                    String[] fileArr = new String[arr.length];
+
+                    for (int i = 0; i < arr.length; i++)
+                        fileArr[i] = arr[i].getPath().toString();
+
+                    clientLog.logListDirectory(path, mode, fileArr);
+                }
+
+                return arr;
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getHomeDirectory() {
+        Path path = new Path("/user/" + user);
+
+        return path.makeQualified(getUri(), null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWorkingDirectory(Path newPath) {
+        try {
+            if (newPath == null) {
+                Path homeDir = getHomeDirectory();
+
+                FileSystem secondaryFs  = secondaryFileSystem();
+
+                if (secondaryFs != null)
+                    secondaryFs.setWorkingDirectory(toSecondary(homeDir));
+
+                workingDir = homeDir;
+            }
+            else {
+                Path fixedNewPath = fixRelativePart(newPath);
+
+                String res = fixedNewPath.toUri().getPath();
+
+                if (!DFSUtil.isValidName(res))
+                    throw new IllegalArgumentException("Invalid DFS directory 
name " + res);
+
+                FileSystem secondaryFs  = secondaryFileSystem();
+
+                if (secondaryFs != null)
+                    secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
+
+                workingDir = fixedNewPath;
+            }
+        }
+        catch (IOException e) {
+            throw new RuntimeException("Failed to obtain secondary file system 
instance.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getWorkingDirectory() {
+        return workingDir;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public boolean mkdirs(Path f, FsPermission perm) throws 
IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(f);
+            IgfsMode mode = mode(path);
+
+            if (mode == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    return false;
+                }
+
+                if (clientLog.isLogEnabled())
+                    clientLog.logMakeDirectory(path, PROXY);
+
+                return secondaryFs.mkdirs(toSecondary(f), perm);
+            }
+            else {
+                boolean mkdirRes = rmtClient.mkdirs(path, permission(perm));
+
+                if (clientLog.isLogEnabled())
+                    clientLog.logMakeDirectory(path, mode);
+
+                return mkdirRes;
+            }
+        }
+        catch (IOException e) {
+            // Intentionally ignore IGFS exceptions here to follow Hadoop 
contract.
+            if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null 
||
+                !X.hasCause(e.getCause(), IgfsException.class)))
+                throw e;
+            else
+                return false;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileStatus(Path f) throws IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            if (mode(f) == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    throw new IOException("Failed to get file status 
(secondary file system is not initialized): " + f);
+                }
+
+                return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
+            }
+            else {
+                IgfsFile info = rmtClient.info(convert(f));
+
+                if (info == null)
+                    throw new FileNotFoundException("File not found: " + f);
+
+                return convert(info);
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ContentSummary getContentSummary(Path f) throws 
IOException {
+        A.notNull(f, "f");
+
+        enterBusy();
+
+        try {
+            if (mode(f) == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    throw new IOException("Failed to get content summary 
(secondary file system is not initialized): " +
+                        f);
+                }
+
+                return secondaryFs.getContentSummary(toSecondary(f));
+            }
+            else {
+                IgfsPathSummary sum = rmtClient.contentSummary(convert(f));
+
+                return new ContentSummary(sum.totalLength(), sum.filesCount(), 
sum.directoriesCount(),
+                    -1, sum.totalLength(), rmtClient.fsStatus().spaceTotal());
+            }
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public BlockLocation[] getFileBlockLocations(FileStatus status, 
long start, long len) throws IOException {
+        A.notNull(status, "status");
+
+        enterBusy();
+
+        try {
+            IgfsPath path = convert(status.getPath());
+
+            if (mode(status.getPath()) == PROXY) {
+                final FileSystem secondaryFs = secondaryFileSystem();
+
+                if (secondaryFs == null) {
+                    assert mgmt;
+
+                    return EMPTY_BLOCK_LOCATIONS;
+                }
+
+                Path secPath = toSecondary(status.getPath());
+
+                return 
secondaryFs.getFileBlockLocations(secondaryFs.getFileStatus(secPath), start, 
len);
+            }
+            else {
+                long now = System.currentTimeMillis();
+
+                List<IgfsBlockLocation> affinity = new 
ArrayList<>(rmtClient.affinity(path, start, len));
+
+                BlockLocation[] arr = new BlockLocation[affinity.size()];
+
+                for (int i = 0; i < arr.length; i++)
+                    arr[i] = convert(affinity.get(i));
+
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Fetched file locations [path=" + path + ", 
fetchTime=" +
+                        (System.currentTimeMillis() - now) + ", locations=" + 
Arrays.asList(arr) + ']');
+
+                return arr;
+            }
+        }
+        catch (FileNotFoundException ignored) {
+            return EMPTY_BLOCK_LOCATIONS;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public long getDefaultBlockSize() {
+        return igfsGrpBlockSize;
+    }
+
+    /**
+     * Resolve path mode.
+     *
+     * @param path HDFS path.
+     * @return Path mode.
+     */
+    public IgfsMode mode(Path path) {
+        return mode(convert(path));
+    }
+
+    /**
+     * Resolve path mode.
+     *
+     * @param path IGFS path.
+     * @return Path mode.
+     */
+    public IgfsMode mode(IgfsPath path) {
+        return modeRslvr.resolveMode(path);
+    }
+
+    /**
+     * @return {@code true} If secondary file system is initialized.
+     */
+    public boolean hasSecondaryFileSystem() {
+        return factory != null;
+    }
+
+    /**
+     * Convert the given path to path acceptable by the primary file system.
+     *
+     * @param path Path.
+     * @return Primary file system path.
+     */
+    private Path toPrimary(Path path) {
+        return convertPath(path, uri);
+    }
+
+    /**
+     * Convert the given path to path acceptable by the secondary file system.
+     *
+     * @param path Path.
+     * @return Secondary file system path.
+     */
+    private Path toSecondary(Path path) {
+        assert factory != null;
+        assert secondaryUri != null;
+
+        return convertPath(path, secondaryUri);
+    }
+
+    /**
+     * Convert path using the given new URI.
+     *
+     * @param path Old path.
+     * @param newUri New URI.
+     * @return New path.
+     */
+    private Path convertPath(Path path, URI newUri) {
+        assert newUri != null;
+
+        if (path != null) {
+            URI pathUri = path.toUri();
+
+            try {
+                return new Path(new URI(pathUri.getScheme() != null ? 
newUri.getScheme() : null,
+                    pathUri.getAuthority() != null ? newUri.getAuthority() : 
null, pathUri.getPath(), null, null));
+            }
+            catch (URISyntaxException e) {
+                throw new IgniteException("Failed to construct secondary file 
system path from the primary file " +
+                    "system path: " + path, e);
+            }
+        }
+        else
+            return null;
+    }
+
+    /**
+     * Convert a file status obtained from the secondary file system to a 
status of the primary file system.
+     *
+     * @param status Secondary file system status.
+     * @return Primary file system status.
+     */
+    @SuppressWarnings("deprecation")
+    private FileStatus toPrimary(FileStatus status) {
+        return status != null ? new FileStatus(status.getLen(), 
status.isDir(), status.getReplication(),
+            status.getBlockSize(), status.getModificationTime(), 
status.getAccessTime(), status.getPermission(),
+            status.getOwner(), status.getGroup(), toPrimary(status.getPath())) 
: null;
+    }
+
+    /**
+     * Convert IGFS path into Hadoop path.
+     *
+     * @param path IGFS path.
+     * @return Hadoop path.
+     */
+    private Path convert(IgfsPath path) {
+        return new Path(IGFS_SCHEME, uriAuthority, path.toString());
+    }
+
+    /**
+     * Convert Hadoop path into IGFS path.
+     *
+     * @param path Hadoop path.
+     * @return IGFS path.
+     */
+    @Nullable private IgfsPath convert(@Nullable Path path) {
+        if (path == null)
+            return null;
+
+        return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
+            new IgfsPath(convert(workingDir), path.toUri().getPath());
+    }
+
+    /**
+     * Convert IGFS affinity block location into Hadoop affinity block 
location.
+     *
+     * @param block IGFS affinity block location.
+     * @return Hadoop affinity block location.
+     */
+    private BlockLocation convert(IgfsBlockLocation block) {
+        Collection<String> names = block.names();
+        Collection<String> hosts = block.hosts();
+
+        return new BlockLocation(
+            names.toArray(new String[names.size()]) /* hostname:portNumber of 
data nodes */,
+            hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes 
*/,
+            block.start(), block.length()
+        ) {
+            @Override public String toString() {
+                try {
+                    return "BlockLocation [offset=" + getOffset() + ", 
length=" + getLength() +
+                        ", hosts=" + Arrays.asList(getHosts()) + ", names=" + 
Arrays.asList(getNames()) + ']';
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Convert IGFS file information into Hadoop file status.
+     *
+     * @param file IGFS file information.
+     * @return Hadoop file status.
+     */
+    @SuppressWarnings("deprecation")
+    private FileStatus convert(IgfsFile file) {
+        return new FileStatus(
+            file.length(),
+            file.isDirectory(),
+            getDefaultReplication(),
+            file.groupBlockSize(),
+            file.modificationTime(),
+            file.accessTime(),
+            permission(file),
+            file.property(IgfsUtils.PROP_USER_NAME, user),
+            file.property(IgfsUtils.PROP_GROUP_NAME, "users"),
+            convert(file.path())) {
+            @Override public String toString() {
+                return "FileStatus [path=" + getPath() + ", isDir=" + isDir() 
+ ", len=" + getLen() +
+                    ", mtime=" + getModificationTime() + ", atime=" + 
getAccessTime() + ']';
+            }
+        };
+    }
+
+    /**
+     * Convert Hadoop permission into IGFS file attribute.
+     *
+     * @param perm Hadoop permission.
+     * @return IGFS attributes.
+     */
+    private Map<String, String> permission(FsPermission perm) {
+        if (perm == null)
+            perm = FsPermission.getDefault();
+
+        return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm));
+    }
+
+    /**
+     * @param perm Permission.
+     * @return String.
+     */
+    private static String toString(FsPermission perm) {
+        return String.format("%04o", perm.toShort());
+    }
+
+    /**
+     * Convert IGFS file attributes into Hadoop permission.
+     *
+     * @param file File info.
+     * @return Hadoop permission.
+     */
+    private FsPermission permission(IgfsFile file) {
+        String perm = file.property(IgfsUtils.PROP_PERMISSION, null);
+
+        if (perm == null)
+            return FsPermission.getDefault();
+
+        try {
+            return new FsPermission((short)Integer.parseInt(perm, 8));
+        }
+        catch (NumberFormatException ignore) {
+            return FsPermission.getDefault();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteHadoopFileSystem.class, this);
+    }
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
+
+    /**
+     * Gets cached or creates a {@link FileSystem}.
+     *
+     * @return The secondary file system.
+     */
+    private @Nullable FileSystem secondaryFileSystem() throws IOException{
+        if (factory == null)
+            return null;
+
+        return factory.get(user);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java
new file mode 100644
index 0000000..60e62ca
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Ignite Hadoop 1.x <code>FileSystem</code> implementation.
+ */
+package org.apache.ignite.hadoop.fs.v1;
\ No newline at end of file

Reply via email to