http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java
new file mode 100644
index 0000000..b8fc8e7
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopRawLocalFileSystem.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.nio.file.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Local file system implementation for Hadoop.
+ */
+public class HadoopRawLocalFileSystem extends FileSystem {
+    /** Working directory for each thread. */
+    private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
+        @Override protected Path initialValue() {
+            return getInitialWorkingDirectory();
+        }
+    };
+
+    /**
+     * Converts Hadoop path to local path.
+     *
+     * @param path Hadoop path.
+     * @return Local path.
+     */
+    File convert(Path path) {
+        checkPath(path);
+
+        if (path.isAbsolute())
+            return new File(path.toUri().getPath());
+
+        return new File(getWorkingDirectory().toUri().getPath(), 
path.toUri().getPath());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getHomeDirectory() {
+        return makeQualified(new Path(System.getProperty("user.home")));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getInitialWorkingDirectory() {
+        File f = new File(System.getProperty("user.dir"));
+
+        return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), 
null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(URI uri, Configuration conf) throws 
IOException {
+        super.initialize(uri, conf);
+
+        setConf(conf);
+
+        String initWorkDir = 
conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
+
+        if (initWorkDir != null)
+            setWorkingDirectory(new Path(initWorkDir));
+    }
+
+    /** {@inheritDoc} */
+    @Override public URI getUri() {
+        return FsConstants.LOCAL_FS_URI;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataInputStream open(Path f, int bufferSize) throws 
IOException {
+        return new FSDataInputStream(new InStream(checkExists(convert(f))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataOutputStream create(Path f, FsPermission 
permission, boolean overwrite, int bufSize,
+        short replication, long blockSize, Progressable progress) throws 
IOException {
+        File file = convert(f);
+
+        if (!overwrite && !file.createNewFile())
+            throw new IOException("Failed to create new file: " + f.toUri());
+
+        return out(file, false, bufSize);
+    }
+
+    /**
+     * @param file File.
+     * @param append Append flag.
+     * @return Output stream.
+     * @throws IOException If failed.
+     */
+    private FSDataOutputStream out(File file, boolean append, int bufSize) 
throws IOException {
+        return new FSDataOutputStream(new BufferedOutputStream(new 
FileOutputStream(file, append),
+            bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new 
Statistics(getUri().getScheme()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataOutputStream append(Path f, int bufSize, 
Progressable progress) throws IOException {
+        return out(convert(f), true, bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rename(Path src, Path dst) throws IOException {
+        return convert(src).renameTo(convert(dst));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(Path f, boolean recursive) throws 
IOException {
+        File file = convert(f);
+
+        if (file.isDirectory() && !recursive)
+            throw new IOException("Failed to remove directory in non recursive 
mode: " + f.toUri());
+
+        return U.delete(file);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWorkingDirectory(Path dir) {
+        workDir.set(fixRelativePart(dir));
+
+        checkPath(dir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getWorkingDirectory() {
+        return workDir.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean mkdirs(Path f, FsPermission permission) throws 
IOException {
+        if(f == null)
+            throw new IllegalArgumentException("mkdirs path arg is null");
+
+        Path parent = f.getParent();
+
+        File p2f = convert(f);
+
+        if(parent != null) {
+            File parent2f = convert(parent);
+
+            if(parent2f != null && parent2f.exists() && 
!parent2f.isDirectory())
+                throw new FileAlreadyExistsException("Parent path is not a 
directory: " + parent);
+
+        }
+
+        return (parent == null || mkdirs(parent)) && (p2f.mkdir() || 
p2f.isDirectory());
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileStatus(Path f) throws IOException {
+        return fileStatus(checkExists(convert(f)));
+    }
+
+    /**
+     * @return File status.
+     */
+    private FileStatus fileStatus(File file) throws IOException {
+        boolean dir = file.isDirectory();
+
+        java.nio.file.Path path = dir ? null : file.toPath();
+
+        return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, 
file.lastModified(), file.lastModified(),
+            /*permission*/null, /*owner*/null, /*group*/null, dir ? null : 
Files.isSymbolicLink(path) ?
+            new Path(Files.readSymbolicLink(path).toUri()) : null, new 
Path(file.toURI()));
+    }
+
+    /**
+     * @param file File.
+     * @return Same file.
+     * @throws FileNotFoundException If does not exist.
+     */
+    private static File checkExists(File file) throws FileNotFoundException {
+        if (!file.exists())
+            throw new FileNotFoundException("File " + file.getAbsolutePath() + 
" does not exist.");
+
+        return file;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus[] listStatus(Path f) throws IOException {
+        File file = convert(f);
+
+        if (checkExists(file).isFile())
+            return new FileStatus[] {fileStatus(file)};
+
+        File[] files = file.listFiles();
+
+        FileStatus[] res = new FileStatus[files.length];
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = fileStatus(files[i]);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean supportsSymlinks() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void createSymlink(Path target, Path link, boolean 
createParent) throws IOException {
+        Files.createSymbolicLink(convert(link).toPath(), 
convert(target).toPath());
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
+        return getFileStatus(getLinkTarget(f));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getLinkTarget(Path f) throws IOException {
+        File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
+
+        return new Path(file.toURI());
+    }
+
+    /**
+     * Input stream.
+     */
+    private static class InStream extends InputStream implements Seekable, 
PositionedReadable {
+        /** */
+        private final RandomAccessFile file;
+
+        /**
+         * @param f File.
+         * @throws IOException If failed.
+         */
+        public InStream(File f) throws IOException {
+            file = new RandomAccessFile(f, "r");
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read() throws IOException {
+            return file.read();
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read(byte[] b, int off, int len) 
throws IOException {
+            return file.read(b, off, len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void close() throws IOException {
+            file.close();
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read(long pos, byte[] buf, int off, 
int len) throws IOException {
+            long pos0 = file.getFilePointer();
+
+            file.seek(pos);
+            int res = file.read(buf, off, len);
+
+            file.seek(pos0);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFully(long pos, byte[] buf, int off, int 
len) throws IOException {
+            if (read(pos, buf, off, len) != len)
+                throw new IOException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFully(long pos, byte[] buf) throws 
IOException {
+            readFully(pos, buf, 0, buf.length);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void seek(long pos) throws IOException {
+            file.seek(pos);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized long getPos() throws IOException {
+            return file.getFilePointer();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean seekToNewSource(long targetPos) throws 
IOException {
+            return false;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java
new file mode 100644
index 0000000..fe43596
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfs.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Facade for communication with grid.
+ */
+public interface HadoopIgfs {
+    /**
+     * Perform handshake.
+     *
+     * @param logDir Log directory.
+     * @return Future with handshake result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHandshakeResponse handshake(String logDir) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Close connection.
+     *
+     * @param force Force flag.
+     */
+    public void close(boolean force);
+
+    /**
+     * Command to retrieve file info for some IGFS path.
+     *
+     * @param path Path to get file info for.
+     * @return Future for info operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile info(IgfsPath path) throws IgniteCheckedException, 
IOException;
+
+    /**
+     * Command to update file properties.
+     *
+     * @param path IGFS path to update properties.
+     * @param props Properties to update.
+     * @return Future for update operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile update(IgfsPath path, Map<String, String> props) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Sets last access time and last modification time for a file.
+     *
+     * @param path Path to update times.
+     * @param accessTime Last access time to set.
+     * @param modificationTime Last modification time to set.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean setTimes(IgfsPath path, long accessTime, long 
modificationTime) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to rename given path.
+     *
+     * @param src Source path.
+     * @param dest Destination path.
+     * @return Future for rename operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean rename(IgfsPath src, IgfsPath dest) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to delete given path.
+     *
+     * @param path Path to delete.
+     * @param recursive {@code True} if deletion is recursive.
+     * @return Future for delete operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean delete(IgfsPath path, boolean recursive) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to get affinity for given path, offset and length.
+     *
+     * @param path Path to get affinity for.
+     * @param start Start position (offset).
+     * @param len Data length.
+     * @return Future for affinity command.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, 
long len) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Gets path summary.
+     *
+     * @param path Path to get summary for.
+     * @return Future that will be completed when summary is received.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsPathSummary contentSummary(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to create directories.
+     *
+     * @param path Path to create.
+     * @return Future for mkdirs operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to get list of files in directory.
+     *
+     * @param path Path to list.
+     * @return Future for listFiles operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsFile> listFiles(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to get directory listing.
+     *
+     * @param path Path to list.
+     * @return Future for listPaths operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsPath> listPaths(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Performs status request.
+     *
+     * @return Status response.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate open(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate open(IgfsPath path, int 
seqReadsBeforePrefetch) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to create file and open it for output.
+     *
+     * @param path Path to file.
+     * @param overwrite If {@code true} then old file contents will be lost.
+     * @param colocate If {@code true} and called on data node, file will be 
written on that node.
+     * @param replication Replication factor.
+     * @param props File properties for creation.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, 
boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) 
throws IgniteCheckedException, IOException;
+
+    /**
+     * Open file for output appending data to the end of a file.
+     *
+     * @param path Path to file.
+     * @param create If {@code true}, file will be created if does not exist.
+     * @param props File properties.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException, 
IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java
new file mode 100644
index 0000000..d610091
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsCommunicationException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Communication exception indicating a problem between file system and IGFS 
instance.
+ */
+public class HadoopIgfsCommunicationException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates new exception with given throwable as a nested cause and
+     * source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public HadoopIgfsCommunicationException(Exception cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested 
cause exception.
+     *
+     * @param msg Error message.
+     */
+    public HadoopIgfsCommunicationException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested 
cause exception.
+     *
+     * @param msg Error message.
+     * @param cause Cause.
+     */
+    public HadoopIgfsCommunicationException(String msg, Exception cause) {
+        super(msg, cause);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
new file mode 100644
index 0000000..014e2a1
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import java.io.IOException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extended IGFS server interface.
+ */
+public interface HadoopIgfsEx extends HadoopIgfs {
+    /**
+     * Adds event listener that will be invoked when connection with server is 
lost or remote error has occurred.
+     * If connection is closed already, callback will be invoked synchronously 
inside this method.
+     *
+     * @param delegate Stream delegate.
+     * @param lsnr Event listener.
+     */
+    public void addEventListener(HadoopIgfsStreamDelegate delegate, 
HadoopIgfsStreamEventListener lsnr);
+
+    /**
+     * Removes event listener that will be invoked when connection with server 
is lost or remote error has occurred.
+     *
+     * @param delegate Stream delegate.
+     */
+    public void removeEventListener(HadoopIgfsStreamDelegate delegate);
+
+    /**
+     * Asynchronously reads specified amount of bytes from opened input stream.
+     *
+     * @param delegate Stream delegate.
+     * @param pos Position to read from.
+     * @param len Data length to read.
+     * @param outBuf Optional output buffer. If buffer length is less then 
{@code len}, all remaining
+     *     bytes will be read into new allocated buffer of length {len - 
outBuf.length} and this buffer will
+     *     be the result of read future.
+     * @param outOff Output offset.
+     * @param outLen Output length.
+     * @return Read data.
+     */
+    public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate 
delegate, long pos, int len,
+        @Nullable final byte[] outBuf, final int outOff, final int outLen);
+
+    /**
+     * Writes data to the stream with given streamId. This method does not 
return any future since
+     * no response to write request is sent.
+     *
+     * @param delegate Stream delegate.
+     * @param data Data to write.
+     * @param off Offset.
+     * @param len Length.
+     * @throws IOException If failed.
+     */
+    public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int 
off, int len) throws IOException;
+
+    /**
+     * Close server stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void closeStream(HadoopIgfsStreamDelegate delegate) throws 
IOException;
+
+    /**
+     * Flush output stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+    /**
+     * The user this Igfs instance works on behalf of.
+     * @return the user name.
+     */
+    public String user();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java
new file mode 100644
index 0000000..5ff1b2e
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsFuture.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IGFS client future that holds response parse closure.
+ */
+public class HadoopIgfsFuture<T> extends GridFutureAdapter<T> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Output buffer. */
+    private byte[] outBuf;
+
+    /** Output offset. */
+    private int outOff;
+
+    /** Output length. */
+    private int outLen;
+
+    /** Read future flag. */
+    private boolean read;
+
+    /**
+     * @return Output buffer.
+     */
+    public byte[] outputBuffer() {
+        return outBuf;
+    }
+
+    /**
+     * @param outBuf Output buffer.
+     */
+    public void outputBuffer(@Nullable byte[] outBuf) {
+        this.outBuf = outBuf;
+    }
+
+    /**
+     * @return Offset in output buffer to write from.
+     */
+    public int outputOffset() {
+        return outOff;
+    }
+
+    /**
+     * @param outOff Offset in output buffer to write from.
+     */
+    public void outputOffset(int outOff) {
+        this.outOff = outOff;
+    }
+
+    /**
+     * @return Length to write to output buffer.
+     */
+    public int outputLength() {
+        return outLen;
+    }
+
+    /**
+     * @param outLen Length to write to output buffer.
+     */
+    public void outputLength(int outLen) {
+        this.outLen = outLen;
+    }
+
+    /**
+     * @param read {@code True} if this is a read future.
+     */
+    public void read(boolean read) {
+        this.read = read;
+    }
+
+    /**
+     * @return {@code True} if this is a read future.
+     */
+    public boolean read() {
+        return read;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
new file mode 100644
index 0000000..3220538
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsInputStream;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.igfs.IgfsUserContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Communication with grid in the same process.
+ */
+public class HadoopIgfsInProc implements HadoopIgfsEx {
+    /** Target IGFS. */
+    private final IgfsEx igfs;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** Event listeners. */
+    private final Map<HadoopIgfsStreamDelegate, HadoopIgfsStreamEventListener> 
lsnrs =
+        new ConcurrentHashMap<>();
+
+    /** Logger. */
+    private final Log log;
+
+    /** The user this Igfs works on behalf of. */
+    private final String user;
+
+    /**
+     * Constructor.
+     *
+     * @param igfs Target IGFS.
+     * @param log Log.
+     */
+    public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws 
IgniteCheckedException {
+        this.user = IgfsUtils.fixUserName(userName);
+
+        this.igfs = igfs;
+
+        this.log = log;
+
+        bufSize = igfs.configuration().getBlockSize() * 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(final String logDir) {
+        return IgfsUserContext.doAs(user, new 
IgniteOutClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply() {
+                igfs.clientLogDirectory(logDir);
+
+                return new IgfsHandshakeResponse(igfs.name(), 
igfs.proxyPaths(), igfs.groupBlockSize(),
+                    igfs.globalSampling());
+                }
+         });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        // Perform cleanup.
+        for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
+            try {
+                lsnr.onClose();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to notify stream event listener", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) throws 
IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() 
{
+                @Override public IgfsFile apply() {
+                    return igfs.info(path);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get file 
info because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() 
{
+                @Override public IgfsFile apply() {
+                    return igfs.update(path, props);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to update file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(final IgfsPath path, final long 
accessTime, final long modificationTime) throws IgniteCheckedException {
+        try {
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.setTimes(path, accessTime, modificationTime);
+
+                    return null;
+                }
+            });
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to set path 
times because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) 
throws IgniteCheckedException {
+        try {
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.rename(src, dest);
+
+                    return null;
+                }
+            });
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to rename path 
because Grid is stopping: " + src);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(final IgfsPath path, final boolean 
recursive) throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+                @Override public Boolean apply() {
+                    return igfs.delete(path, recursive);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to delete path 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+                @Override public IgfsStatus call() throws 
IgniteCheckedException {
+                    return igfs.globalSpace();
+                }
+            });
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get file 
system status because Grid is " +
+                "stopping.");
+        }
+        catch (IgniteCheckedException | RuntimeException | Error e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new AssertionError("Must never go there.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) 
throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsPath>>() {
+                @Override public Collection<IgfsPath> apply() {
+                    return igfs.listPaths(path);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to list paths 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) 
throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsFile>>() {
+                @Override public Collection<IgfsFile> apply() {
+                    return igfs.listFiles(path);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to list files 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, 
String> props) throws IgniteCheckedException {
+        try {
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.mkdirs(path, props);
+
+                    return null;
+                }
+            });
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to create 
directory because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) 
throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<IgfsPathSummary>() {
+                @Override public IgfsPathSummary apply() {
+                    return igfs.summary(path);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get content 
summary because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath 
path, final long start, final long len)
+        throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+                @Override public Collection<IgfsBlockLocation> apply() {
+                    return igfs.affinity(path, start, len);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get affinity 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws 
IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStream stream = igfs.open(path, bufSize);
+
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.length());
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final 
int seqReadsBeforePrefetch)
+        throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStream stream = igfs.open(path, bufSize, 
seqReadsBeforePrefetch);
+
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream, stream.length());
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, 
final boolean overwrite, final boolean colocate,
+        final int replication, final long blockSize, final @Nullable 
Map<String, String> props) throws IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.create(path, bufSize, 
overwrite,
+                        colocate ? igfs.nextAffinityKey() : null, replication, 
blockSize, props);
+
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to create file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, 
final boolean create,
+        final @Nullable Map<String, String> props) throws 
IgniteCheckedException {
+        try {
+            return IgfsUserContext.doAs(user, new 
IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.append(path, bufSize, 
create, props);
+
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, 
stream);
+                }
+            });
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to append file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<byte[]> 
readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
+        @Nullable byte[] outBuf, int outOff, int outLen) {
+        IgfsInputStream stream = delegate.target();
+
+        try {
+            byte[] res = null;
+
+            if (outBuf != null) {
+                int outTailLen = outBuf.length - outOff;
+
+                if (len <= outTailLen)
+                    stream.readFully(pos, outBuf, outOff, len);
+                else {
+                    stream.readFully(pos, outBuf, outOff, outTailLen);
+
+                    int remainderLen = len - outTailLen;
+
+                    res = new byte[remainderLen];
+
+                    stream.readFully(pos, res, 0, remainderLen);
+                }
+            } else {
+                res = new byte[len];
+
+                stream.readFully(pos, res, 0, len);
+            }
+
+            return new GridFinishedFuture<>(res);
+        }
+        catch (IllegalStateException | IOException e) {
+            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] 
data, int off, int len)
+        throws IOException {
+        try {
+            IgfsOutputStream stream = delegate.target();
+
+            stream.write(data, off, len);
+        }
+        catch (IllegalStateException | IOException e) {
+            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            if (e instanceof IllegalStateException)
+                throw new IOException("Failed to write data to IGFS stream 
because Grid is stopping.", e);
+            else
+                throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush(HadoopIgfsStreamDelegate delegate) throws 
IOException {
+        try {
+            IgfsOutputStream stream = delegate.target();
+
+            stream.flush();
+        }
+        catch (IllegalStateException | IOException e) {
+            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            if (e instanceof IllegalStateException)
+                throw new IOException("Failed to flush data to IGFS stream 
because Grid is stopping.", e);
+            else
+                throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws 
IOException {
+        Closeable closeable = desc.target();
+
+        try {
+            closeable.close();
+        }
+        catch (IllegalStateException e) {
+            throw new IOException("Failed to close IGFS stream because Grid is 
stopping.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addEventListener(HadoopIgfsStreamDelegate delegate,
+        HadoopIgfsStreamEventListener lsnr) {
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
+
+        assert lsnr0 == null || lsnr0 == lsnr;
+
+        if (log.isDebugEnabled())
+            log.debug("Added stream event listener [delegate=" + delegate + 
']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeEventListener(HadoopIgfsStreamDelegate 
delegate) {
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate);
+
+        if (lsnr0 != null && log.isDebugEnabled())
+            log.debug("Removed stream event listener [delegate=" + delegate + 
']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java
new file mode 100644
index 0000000..46b46d7
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInputStream.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * IGFS input stream wrapper for hadoop interfaces.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public final class HadoopIgfsInputStream extends InputStream implements 
Seekable, PositionedReadable,
+    HadoopIgfsStreamEventListener {
+    /** Minimum buffer size. */
+    private static final int MIN_BUF_SIZE = 4 * 1024;
+
+    /** Server stream delegate. */
+    private HadoopIgfsStreamDelegate delegate;
+
+    /** Stream ID used by logger. */
+    private long logStreamId;
+
+    /** Stream position. */
+    private long pos;
+
+    /** Stream read limit. */
+    private long limit;
+
+    /** Mark position. */
+    private long markPos = -1;
+
+    /** Prefetch buffer. */
+    private DoubleFetchBuffer buf = new DoubleFetchBuffer();
+
+    /** Buffer half size for double-buffering. */
+    private int bufHalfSize;
+
+    /** Closed flag. */
+    private volatile boolean closed;
+
+    /** Flag set if stream was closed due to connection breakage. */
+    private boolean connBroken;
+
+    /** Logger. */
+    private Log log;
+
+    /** Client logger. */
+    private IgfsLogger clientLog;
+
+    /** Read time. */
+    private long readTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of read bytes. */
+    private long total;
+
+    /**
+     * Creates input stream.
+     *
+     * @param delegate Server stream delegate.
+     * @param limit Read limit.
+     * @param bufSize Buffer size.
+     * @param log Log.
+     * @param clientLog Client logger.
+     */
+    public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long 
limit, int bufSize, Log log,
+        IgfsLogger clientLog, long logStreamId) {
+        assert limit >= 0;
+
+        this.delegate = delegate;
+        this.limit = limit;
+        this.log = log;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE);
+
+        lastTs = System.nanoTime();
+
+        delegate.hadoop().addEventListener(delegate, this);
+    }
+
+    /**
+     * Read start.
+     */
+    private void readStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void readEnd() {
+        long now = System.nanoTime();
+
+        readTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read() throws IOException {
+        checkClosed();
+
+        readStart();
+
+        try {
+            if (eof())
+                return -1;
+
+            buf.refreshAhead(pos);
+
+            int res = buf.atPosition(pos);
+
+            pos++;
+            total++;
+
+            buf.refreshAhead(pos);
+
+            return res;
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(@NotNull byte[] b, int off, int 
len) throws IOException {
+        checkClosed();
+
+        if (eof())
+            return -1;
+
+        readStart();
+
+        try {
+            long remaining = limit - pos;
+
+            int read = buf.flatten(b, pos, off, len);
+
+            pos += read;
+            total += read;
+            remaining -= read;
+
+            if (remaining > 0 && read != len) {
+                int readAmt = (int)Math.min(remaining, len - read);
+
+                delegate.hadoop().readData(delegate, pos, readAmt, b, off + 
read, len - read).get();
+
+                read += readAmt;
+                pos += readAmt;
+                total += readAmt;
+            }
+
+            buf.refreshAhead(pos);
+
+            return read;
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long skip(long n) throws IOException {
+        checkClosed();
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSkip(logStreamId, n);
+
+        long oldPos = pos;
+
+        if (pos + n <= limit)
+            pos += n;
+        else
+            pos = limit;
+
+        buf.refreshAhead(pos);
+
+        return pos - oldPos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int available() throws IOException {
+        checkClosed();
+
+        int available = buf.available(pos);
+
+        assert available >= 0;
+
+        return available;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            readStart();
+
+            if (log.isDebugEnabled())
+                log.debug("Closing input stream: " + delegate);
+
+            delegate.hadoop().closeStream(delegate);
+
+            readEnd();
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
+
+            markClosed(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Closed stream [delegate=" + delegate + ", 
readTime=" + readTime +
+                    ", userTime=" + userTime + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void mark(int readLimit) {
+        markPos = pos;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logMark(logStreamId, readLimit);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void reset() throws IOException {
+        checkClosed();
+
+        if (clientLog.isLogEnabled())
+            clientLog.logReset(logStreamId);
+
+        if (markPos == -1)
+            throw new IOException("Stream was not marked.");
+
+        pos = markPos;
+
+        buf.refreshAhead(pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean markSupported() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(long position, byte[] buf, int off, 
int len) throws IOException {
+        long remaining = limit - position;
+
+        int read = (int)Math.min(len, remaining);
+
+        // Return -1 at EOF.
+        if (read == 0)
+            return -1;
+
+        readFully(position, buf, off, read);
+
+        return read;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long position, byte[] buf, 
int off, int len) throws IOException {
+        long remaining = limit - position;
+
+        checkClosed();
+
+        if (len > remaining)
+            throw new EOFException("End of stream reached before data was 
fully read.");
+
+        readStart();
+
+        try {
+            int read = this.buf.flatten(buf, position, off, len);
+
+            total += read;
+
+            if (read != len) {
+                int readAmt = len - read;
+
+                delegate.hadoop().readData(delegate, position + read, readAmt, 
buf, off + read, readAmt).get();
+
+                total += readAmt;
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logRandomRead(logStreamId, position, len);
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(long position, byte[] buf) throws 
IOException {
+        readFully(position, buf, 0, buf.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void seek(long pos) throws IOException {
+        A.ensure(pos >= 0, "position must be non-negative");
+
+        checkClosed();
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSeek(logStreamId, pos);
+
+        if (pos > limit)
+            pos = limit;
+
+        if (log.isDebugEnabled())
+            log.debug("Seek to position [delegate=" + delegate + ", pos=" + 
pos + ", oldPos=" + this.pos + ']');
+
+        this.pos = pos;
+
+        buf.refreshAhead(pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long getPos() {
+        return pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean seekToNewSource(long targetPos) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        markClosed(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError(String errMsg) {
+        // No-op.
+    }
+
+    /**
+     * Marks stream as closed.
+     *
+     * @param connBroken {@code True} if connection with server was lost.
+     */
+    private void markClosed(boolean connBroken) {
+        // It is ok to have race here.
+        if (!closed) {
+            closed = true;
+
+            this.connBroken = connBroken;
+
+            delegate.hadoop().removeEventListener(delegate);
+        }
+    }
+
+    /**
+     * @throws IOException If check failed.
+     */
+    private void checkClosed() throws IOException {
+        if (closed) {
+            if (connBroken)
+                throw new IOException("Server connection was lost.");
+            else
+                throw new IOException("Stream is closed.");
+        }
+    }
+
+    /**
+     * @return {@code True} if end of stream reached.
+     */
+    private boolean eof() {
+        return limit == pos;
+    }
+
+    /**
+     * Asynchronous prefetch buffer.
+     */
+    private static class FetchBufferPart {
+        /** Read future. */
+        private IgniteInternalFuture<byte[]> readFut;
+
+        /** Position of cached chunk in file. */
+        private long pos;
+
+        /** Prefetch length. Need to store as read future result might be not 
available yet. */
+        private int len;
+
+        /**
+         * Creates fetch buffer part.
+         *
+         * @param readFut Read future for this buffer.
+         * @param pos Read position.
+         * @param len Chunk length.
+         */
+        private FetchBufferPart(IgniteInternalFuture<byte[]> readFut, long 
pos, int len) {
+            this.readFut = readFut;
+            this.pos = pos;
+            this.len = len;
+        }
+
+        /**
+         * Copies cached data if specified position matches cached region.
+         *
+         * @param dst Destination buffer.
+         * @param pos Read position in file.
+         * @param dstOff Offset in destination buffer from which start writing.
+         * @param len Maximum number of bytes to copy.
+         * @return Number of bytes copied.
+         * @throws IgniteCheckedException If read future failed.
+         */
+        public int flatten(byte[] dst, long pos, int dstOff, int len) throws 
IgniteCheckedException {
+            // If read start position is within cached boundaries.
+            if (contains(pos)) {
+                byte[] data = readFut.get();
+
+                int srcPos = (int)(pos - this.pos);
+                int cpLen = Math.min(len, data.length - srcPos);
+
+                U.arrayCopy(data, srcPos, dst, dstOff, cpLen);
+
+                return cpLen;
+            }
+
+            return 0;
+        }
+
+        /**
+         * @return {@code True} if data is ready to be read.
+         */
+        public boolean ready() {
+            return readFut.isDone();
+        }
+
+        /**
+         * Checks if current buffer part contains given position.
+         *
+         * @param pos Position to check.
+         * @return {@code True} if position matches buffer region.
+         */
+        public boolean contains(long pos) {
+            return this.pos <= pos && this.pos + len > pos;
+        }
+    }
+
+    private class DoubleFetchBuffer {
+        /**  */
+        private FetchBufferPart first;
+
+        /** */
+        private FetchBufferPart second;
+
+        /**
+         * Copies fetched data from both buffers to destination array if 
cached region matched read position.
+         *
+         * @param dst Destination buffer.
+         * @param pos Read position in file.
+         * @param dstOff Destination buffer offset.
+         * @param len Maximum number of bytes to copy.
+         * @return Number of bytes copied.
+         * @throws IgniteCheckedException If any read operation failed.
+         */
+        public int flatten(byte[] dst, long pos, int dstOff, int len) throws 
IgniteCheckedException {
+            assert dstOff >= 0;
+            assert dstOff + len <= dst.length : "Invalid indices [dst.length=" 
+ dst.length + ", dstOff=" + dstOff +
+                ", len=" + len + ']';
+
+            int bytesCopied = 0;
+
+            if (first != null) {
+                bytesCopied += first.flatten(dst, pos, dstOff, len);
+
+                if (bytesCopied != len && second != null) {
+                    assert second.pos == first.pos + first.len;
+
+                    bytesCopied += second.flatten(dst, pos + bytesCopied, 
dstOff + bytesCopied, len - bytesCopied);
+                }
+            }
+
+            return bytesCopied;
+        }
+
+        /**
+         * Gets byte at specified position in buffer.
+         *
+         * @param pos Stream position.
+         * @return Read byte.
+         * @throws IgniteCheckedException If read failed.
+         */
+        public int atPosition(long pos) throws IgniteCheckedException {
+            // Should not reach here if stream contains no data.
+            assert first != null;
+
+            if (first.contains(pos)) {
+                byte[] bytes = first.readFut.get();
+
+                return bytes[((int)(pos - first.pos))] & 0xFF;
+            }
+            else {
+                assert second != null;
+                assert second.contains(pos);
+
+                byte[] bytes = second.readFut.get();
+
+                return bytes[((int)(pos - second.pos))] & 0xFF;
+            }
+        }
+
+        /**
+         * Starts asynchronous buffer refresh if needed, depending on current 
position.
+         *
+         * @param pos Current stream position.
+         */
+        public void refreshAhead(long pos) {
+            if (fullPrefetch(pos)) {
+                first = fetch(pos, bufHalfSize);
+                second = fetch(pos + bufHalfSize, bufHalfSize);
+            }
+            else if (needFlip(pos)) {
+                first = second;
+
+                second = fetch(first.pos + first.len, bufHalfSize);
+            }
+        }
+
+        /**
+         * @param pos Position from which read is expected.
+         * @return Number of bytes available to be read without blocking.
+         */
+        public int available(long pos) {
+            int available = 0;
+
+            if (first != null) {
+                if (first.contains(pos)) {
+                    if (first.ready()) {
+                        available += (pos - first.pos);
+
+                        if (second != null && second.ready())
+                            available += second.len;
+                    }
+                }
+                else {
+                    if (second != null && second.contains(pos) && 
second.ready())
+                        available += (pos - second.pos);
+                }
+            }
+
+            return available;
+        }
+
+        /**
+         * Checks if position shifted enough to forget previous buffer.
+         *
+         * @param pos Current position.
+         * @return {@code True} if need flip buffers.
+         */
+        private boolean needFlip(long pos) {
+            // Return true if we read more then half of second buffer.
+            return second != null && second.contains(pos);
+        }
+
+        /**
+         * Determines if all cached bytes should be discarded and new region 
should be
+         * prefetched.
+         *
+         * @param curPos Current stream position.
+         * @return {@code True} if need to refresh both blocks.
+         */
+        private boolean fullPrefetch(long curPos) {
+            // If no data was prefetched yet, return true.
+            return first == null || curPos < first.pos || (second != null && 
curPos >= second.pos + second.len);
+        }
+
+        /**
+         * Starts asynchronous fetch for given region.
+         *
+         * @param pos Position to read from.
+         * @param size Number of bytes to read.
+         * @return Fetch buffer part.
+         */
+        private FetchBufferPart fetch(long pos, int size) {
+            long remaining = limit - pos;
+
+            size = (int)Math.min(size, remaining);
+
+            return size <= 0 ? null :
+                new FetchBufferPart(delegate.hadoop().readData(delegate, pos, 
size, null, 0, 0), pos, size);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java
new file mode 100644
index 0000000..70f645f
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsIo.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.igfs.common.IgfsMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IO abstraction layer for IGFS client. Two kind of messages are expected to 
be sent: requests with response
+ * and request without response.
+ */
+public interface HadoopIgfsIo {
+    /**
+     * Sends given IGFS client message and asynchronously awaits for response.
+     *
+     * @param msg Message to send.
+     * @return Future that will be completed.
+     * @throws IgniteCheckedException If a message cannot be sent (connection 
is broken or client was closed).
+     */
+    public IgniteInternalFuture<IgfsMessage> send(IgfsMessage msg) throws 
IgniteCheckedException;
+
+    /**
+     * Sends given IGFS client message and asynchronously awaits for response. 
When IO detects response
+     * beginning for given message it stops reading data and passes input 
stream to closure which can read
+     * response in a specific way.
+     *
+     * @param msg Message to send.
+     * @param outBuf Output buffer. If {@code null}, the output buffer is not 
used.
+     * @param outOff Output buffer offset.
+     * @param outLen Output buffer length.
+     * @return Future that will be completed when response is returned from 
closure.
+     * @throws IgniteCheckedException If a message cannot be sent (connection 
is broken or client was closed).
+     */
+    public <T> IgniteInternalFuture<T> send(IgfsMessage msg, @Nullable byte[] 
outBuf, int outOff, int outLen)
+        throws IgniteCheckedException;
+
+    /**
+     * Sends given message and does not wait for response.
+     *
+     * @param msg Message to send.
+     * @throws IgniteCheckedException If send failed.
+     */
+    public void sendPlain(IgfsMessage msg) throws IgniteCheckedException;
+
+    /**
+     * Adds event listener that will be invoked when connection with server is 
lost or remote error has occurred.
+     * If connection is closed already, callback will be invoked synchronously 
inside this method.
+     *
+     * @param lsnr Event listener.
+     */
+    public void addEventListener(HadoopIgfsIpcIoListener lsnr);
+
+    /**
+     * Removes event listener that will be invoked when connection with server 
is lost or remote error has occurred.
+     *
+     * @param lsnr Event listener.
+     */
+    public void removeEventListener(HadoopIgfsIpcIoListener lsnr);
+}
\ No newline at end of file

Reply via email to