Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-226 4daa02139 -> 9cf92dd53


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java
new file mode 100644
index 0000000..fdf5a0a
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * GGFS Hadoop stream descriptor.
+ */
+public class IgfsHadoopStreamDelegate {
+    /** RPC handler. */
+    private final IgfsHadoopEx hadoop;
+
+    /** Target. */
+    private final Object target;
+
+    /** Optional stream length. */
+    private final long len;
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     */
+    public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target) {
+        this(hadoop, target, -1);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     * @param len Optional length.
+     */
+    public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target, long 
len) {
+        assert hadoop != null;
+        assert target != null;
+
+        this.hadoop = hadoop;
+        this.target = target;
+        this.len = len;
+    }
+
+    /**
+     * @return RPC handler.
+     */
+    public IgfsHadoopEx hadoop() {
+        return hadoop;
+    }
+
+    /**
+     * @return Stream target.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T target() {
+        return (T) target;
+    }
+
+    /**
+     * @return Length.
+     */
+    public long length() {
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return System.identityHashCode(target);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj != null && obj instanceof IgfsHadoopStreamDelegate &&
+            target == ((IgfsHadoopStreamDelegate)obj).target;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsHadoopStreamDelegate.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java
new file mode 100644
index 0000000..4137685
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * GGFS input stream event listener.
+ */
+public interface IgfsHadoopStreamEventListener {
+    /**
+     * Callback invoked when the stream is being closed.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onClose() throws IgniteCheckedException;
+
+    /**
+     * Callback invoked when remote error occurs.
+     *
+     * @param errMsg Error message.
+     */
+    public void onError(String errMsg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java
new file mode 100644
index 0000000..728e3c2
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Utility constants and methods for GGFS Hadoop file system.
+ */
+public class IgfsHadoopUtils {
+    /** Parameter name for endpoint no embed mode flag. */
+    public static final String PARAM_GGFS_ENDPOINT_NO_EMBED = 
"fs.ggfs.%s.endpoint.no_embed";
+
+    /** Parameter name for endpoint no shared memory flag. */
+    public static final String PARAM_GGFS_ENDPOINT_NO_LOCAL_SHMEM = 
"fs.ggfs.%s.endpoint.no_local_shmem";
+
+    /** Parameter name for endpoint no local TCP flag. */
+    public static final String PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP = 
"fs.ggfs.%s.endpoint.no_local_tcp";
+
+    /**
+     * Get string parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return String value.
+     */
+    public static String parameter(Configuration cfg, String name, String 
authority, String dflt) {
+        return cfg.get(String.format(name, authority != null ? authority : 
""), dflt);
+    }
+
+    /**
+     * Get integer parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Integer value.
+     * @throws IOException In case of parse exception.
+     */
+    public static int parameter(Configuration cfg, String name, String 
authority, int dflt) throws IOException {
+        String name0 = String.format(name, authority != null ? authority : "");
+
+        try {
+            return cfg.getInt(name0, dflt);
+        }
+        catch (NumberFormatException ignore) {
+            throw new IOException("Failed to parse parameter value to integer: 
" + name0);
+        }
+    }
+
+    /**
+     * Get boolean parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Boolean value.
+     */
+    public static boolean parameter(Configuration cfg, String name, String 
authority, boolean dflt) {
+        return cfg.getBoolean(String.format(name, authority != null ? 
authority : ""), dflt);
+    }
+
+    /**
+     * Cast GG exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @return Casted exception.
+     */
+    public static IOException cast(IgniteCheckedException e) {
+        return cast(e, null);
+    }
+
+    /**
+     * Cast GG exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @param path Path for exceptions.
+     * @return Casted exception.
+     */
+    @SuppressWarnings("unchecked")
+    public static IOException cast(IgniteCheckedException e, @Nullable String 
path) {
+        assert e != null;
+
+        // First check for any nested IOException; if exists - re-throw it.
+        if (e.hasCause(IOException.class))
+            return e.getCause(IOException.class);
+        else if (e.hasCause(IgfsFileNotFoundException.class))
+            return new FileNotFoundException(path); // TODO: Or 
PathNotFoundException?
+        else if (e.hasCause(IgfsParentNotDirectoryException.class))
+            return new ParentNotDirectoryException(path);
+        else if (path != null && 
e.hasCause(IgfsDirectoryNotEmptyException.class))
+            return new PathIsNotEmptyDirectoryException(path);
+        else if (path != null && 
e.hasCause(IgfsPathAlreadyExistsException.class))
+            return new PathExistsException(path);
+        else
+            return new IOException(e);
+    }
+
+    /**
+     * Constructor.
+     */
+    private IgfsHadoopUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java
new file mode 100644
index 0000000..d72a4aa
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopWrapper.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopEndpoint.*;
+import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
+
+/**
+ * Wrapper for GGFS server.
+ */
+public class IgfsHadoopWrapper implements IgfsHadoop {
+    /** Delegate. */
+    private final AtomicReference<Delegate> delegateRef = new 
AtomicReference<>();
+
+    /** Authority. */
+    private final String authority;
+
+    /** Connection string. */
+    private final IgfsHadoopEndpoint endpoint;
+
+    /** Log directory. */
+    private final String logDir;
+
+    /** Configuration. */
+    private final Configuration conf;
+
+    /** Logger. */
+    private final Log log;
+
+    /**
+     * Constructor.
+     *
+     * @param authority Authority (connection string).
+     * @param logDir Log directory for server.
+     * @param conf Configuration.
+     * @param log Current logger.
+     */
+    public IgfsHadoopWrapper(String authority, String logDir, Configuration 
conf, Log log) throws IOException {
+        try {
+            this.authority = authority;
+            this.endpoint = new IgfsHadoopEndpoint(authority);
+            this.logDir = logDir;
+            this.conf = conf;
+            this.log = log;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to parse endpoint: " + authority, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) throws 
IOException {
+        return withReconnectHandling(new 
FileSystemClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply(IgfsHadoopEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hndResp;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        Delegate delegate = delegateRef.get();
+
+        if (delegate != null && delegateRef.compareAndSet(delegate, null))
+            delegate.close(force);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.info(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, 
String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.update(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(final IgfsPath path, final long 
accessTime, final long modificationTime)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.setTimes(path, accessTime, modificationTime);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) 
throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.rename(src, dest);
+            }
+        }, src);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(final IgfsPath path, final boolean 
recursive) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.delete(path, recursive);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath 
path, final long start,
+        final long len) throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<Collection<IgfsBlockLocation>>() {
+            @Override public Collection<IgfsBlockLocation> apply(IgfsHadoopEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.affinity(path, start, len);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) 
throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
+            @Override public IgfsPathSummary apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.contentSummary(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, 
String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.mkdirs(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) 
throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<Collection<IgfsFile>>() {
+            @Override public Collection<IgfsFile> apply(IgfsHadoopEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.listFiles(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) 
throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<Collection<IgfsPath>>() {
+            @Override public Collection<IgfsPath> apply(IgfsHadoopEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.listPaths(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
+            @Override public IgfsStatus apply(IgfsHadoopEx hadoop, 
IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.fsStatus();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate open(final IgfsPath path) throws 
IOException {
+        return withReconnectHandling(new 
FileSystemClosure<IgfsHadoopStreamDelegate>() {
+            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.open(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate open(final IgfsPath path, final 
int seqReadsBeforePrefetch)
+        throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<IgfsHadoopStreamDelegate>() {
+            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.open(path, seqReadsBeforePrefetch);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate create(final IgfsPath path, 
final boolean overwrite,
+        final boolean colocate, final int replication, final long blockSize, 
@Nullable final Map<String, String> props)
+        throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<IgfsHadoopStreamDelegate>() {
+            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.create(path, overwrite, colocate, replication, 
blockSize, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate append(final IgfsPath path, 
final boolean create,
+        @Nullable final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new 
FileSystemClosure<IgfsHadoopStreamDelegate>() {
+            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx 
hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, 
IOException {
+                return hadoop.append(path, create, props);
+            }
+        }, path);
+    }
+
+    /**
+     * Execute closure which is not path-specific.
+     *
+     * @param clo Closure.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws 
IOException {
+        return withReconnectHandling(clo, null);
+    }
+
+    /**
+     * Execute closure.
+     *
+     * @param clo Closure.
+     * @param path Path for exceptions.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, 
@Nullable IgfsPath path)
+        throws IOException {
+        Exception err = null;
+
+        for (int i = 0; i < 2; i++) {
+            Delegate curDelegate = null;
+
+            boolean close = false;
+            boolean force = false;
+
+            try {
+                curDelegate = delegate();
+
+                assert curDelegate != null;
+
+                close = curDelegate.doomed;
+
+                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
+            }
+            catch (IgfsHadoopCommunicationException e) {
+                if (curDelegate != null && !curDelegate.doomed) {
+                    // Try getting rid fo faulty delegate ASAP.
+                    delegateRef.compareAndSet(curDelegate, null);
+
+                    close = true;
+                    force = true;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message to a server: " + e);
+
+                err = e;
+            }
+            catch (IgniteCheckedException e) {
+                throw IgfsHadoopUtils.cast(e, path != null ? path.toString() : 
null);
+            }
+            finally {
+                if (close) {
+                    assert curDelegate != null;
+
+                    curDelegate.close(force);
+                }
+            }
+        }
+
+        throw new IOException("Failed to communicate with GGFS.", err);
+    }
+
+    /**
+     * Get delegate creating it if needed.
+     *
+     * @return Delegate.
+     */
+    private Delegate delegate() throws IgfsHadoopCommunicationException {
+        Exception err = null;
+
+        // 1. If delegate is set, return it immediately.
+        Delegate curDelegate = delegateRef.get();
+
+        if (curDelegate != null)
+            return curDelegate;
+
+        // 2. Guess that we are in the same VM.
+        if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_EMBED, authority, false)) {
+            IgfsEx ggfs = null;
+
+            if (endpoint.grid() == null) {
+                try {
+                    Ignite ignite = G.ignite();
+
+                    ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs());
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+            }
+            else {
+                for (Ignite ignite : G.allGrids()) {
+                    try {
+                        ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs());
+
+                        break;
+                    }
+                    catch (Exception e) {
+                        err = e;
+                    }
+                }
+            }
+
+            if (ggfs != null) {
+                IgfsHadoopEx hadoop = null;
+
+                try {
+                    hadoop = new IgfsHadoopInProc(ggfs, log);
+
+                    curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof IgfsHadoopCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to in-proc GGFS, fallback 
to IPC mode.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 3. Try connecting using shmem.
+        if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, 
false)) {
+            if (curDelegate == null && !U.isWindows()) {
+                IgfsHadoopEx hadoop = null;
+
+                try {
+                    hadoop = new IgfsHadoopOutProc(endpoint.port(), 
endpoint.grid(), endpoint.ggfs(), log);
+
+                    curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof IgfsHadoopCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to out-proc local GGFS 
using shmem.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 4. Try local TCP connection.
+        boolean skipLocTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, 
authority, false);
+
+        if (!skipLocTcp) {
+            if (curDelegate == null) {
+                IgfsHadoopEx hadoop = null;
+
+                try {
+                    hadoop = new IgfsHadoopOutProc(LOCALHOST, endpoint.port(), 
endpoint.grid(), endpoint.ggfs(),
+                        log);
+
+                    curDelegate = new Delegate(hadoop, 
hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof IgfsHadoopCommunicationException)
+                        hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to out-proc local GGFS 
using TCP.", e);
+
+                    err = e;
+                }
+            }
+        }
+
+        // 5. Try remote TCP connection.
+        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, 
endpoint.host()))) {
+            IgfsHadoopEx hadoop = null;
+
+            try {
+                hadoop = new IgfsHadoopOutProc(endpoint.host(), 
endpoint.port(), endpoint.grid(), endpoint.ggfs(), log);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof IgfsHadoopCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to out-proc remote GGFS using 
TCP.", e);
+
+                err = e;
+            }
+        }
+
+        if (curDelegate != null) {
+            if (!delegateRef.compareAndSet(null, curDelegate))
+                curDelegate.doomed = true;
+
+            return curDelegate;
+        }
+        else
+            throw new IgfsHadoopCommunicationException("Failed to connect to 
GGFS: " + endpoint, err);
+    }
+
+    /**
+     * File system operation closure.
+     */
+    private static interface FileSystemClosure<T> {
+        /**
+         * Call closure body.
+         *
+         * @param hadoop RPC handler.
+         * @param hndResp Handshake response.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         * @throws IOException If failed.
+         */
+        public T apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) 
throws IgniteCheckedException, IOException;
+    }
+
+    /**
+     * Delegate.
+     */
+    private static class Delegate {
+        /** RPC handler. */
+        private final IgfsHadoopEx hadoop;
+
+        /** Handshake request. */
+        private final IgfsHandshakeResponse hndResp;
+
+        /** Close guard. */
+        private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+        /** Whether this delegate must be closed at the end of the next 
invocation. */
+        private boolean doomed;
+
+        /**
+         * Constructor.
+         *
+         * @param hadoop Hadoop.
+         * @param hndResp Handshake response.
+         */
+        private Delegate(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) {
+            this.hadoop = hadoop;
+            this.hndResp = hndResp;
+        }
+
+        /**
+         * Close underlying RPC handler.
+         *
+         * @param force Force flag.
+         */
+        private void close(boolean force) {
+            if (closeGuard.compareAndSet(false, true))
+                hadoop.close(force);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html
new file mode 100644
index 0000000..4520df8
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/package.html
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
+<html>
+<body>
+    <!-- Package description. -->
+    Contains GGFS client classes.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html
new file mode 100644
index 0000000..4f47151
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/package.html
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
+<html>
+<body>
+    <!-- Package description. -->
+    Contains GGFS client and common classes.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
index 44a8e9b..abe8aa1 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
@@ -21,7 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.util.typedef.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index c513ebd..8f140ff 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -20,7 +20,7 @@ package org.apache.ignite.igfs;
 import junit.framework.*;
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.typedef.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
index f4d758e..f1d1268 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.permission.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
index 3c6d000..6e55d59 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
index 577b910..b7c6b05 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
@@ -25,7 +25,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java
index 0d9c740..1dc8ff7 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemClientSelfTest.java
@@ -20,8 +20,8 @@ package org.apache.ignite.igfs;
 import org.apache.commons.logging.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.fs.common.*;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java
index 56e117e..ca82d39 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemHandshakeSelfTest.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.cache.CacheDistributionMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
-import static org.apache.ignite.internal.fs.hadoop.IgfsHadoopUtils.*;
+import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
 import static 
org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java
index 6d2739b..84340f6 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemIpcCacheSelfTest.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.typedef.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java
index d7cef36..2075331 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerSelfTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.igfs;
 
-import org.apache.ignite.internal.fs.common.*;
+import org.apache.ignite.internal.igfs.common.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
@@ -25,7 +25,7 @@ import java.io.*;
 import java.util.*;
 
 import static org.apache.ignite.igfs.IgfsMode.*;
-import static org.apache.ignite.internal.fs.common.IgfsLogger.*;
+import static org.apache.ignite.internal.igfs.common.IgfsLogger.*;
 
 /**
  * Grid GGFS client logger test.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java
index affedc0..a135fd7 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemLoggerStateSelfTest.java
@@ -23,7 +23,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.hadoop.v1.*;
-import org.apache.ignite.internal.fs.common.*;
+import org.apache.ignite.internal.igfs.common.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
index 728cf18..c8a388f 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.hadoop.v1.*;
-import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.fs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

Reply via email to