Repository: ignite
Updated Branches:
  refs/heads/ignite-3553 6a3e59192 -> b77bee0fe


Added message processing logic.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5322e8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5322e8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5322e8a

Branch: refs/heads/ignite-3553
Commit: b5322e8af63bd1194acc1c1fc0ea1efebd434afd
Parents: 6a3e591
Author: vozerov-gridgain <[email protected]>
Authored: Wed Jul 27 15:35:35 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Wed Jul 27 15:35:35 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   2 +-
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../internal/processors/igfs/IgfsContext.java   |   8 +-
 .../internal/processors/igfs/IgfsProcessor.java |   6 +-
 .../igfs/client/IgfsClientAbstractCallable.java |  10 +-
 .../igfs/client/IgfsClientClosureManager.java   | 191 ----------------
 .../client/IgfsClientClosureOutOperation.java   |  76 -------
 .../igfs/client/IgfsClientClosureRequest.java   | 203 -----------------
 .../igfs/client/IgfsClientClosureResponse.java  | 133 -----------
 .../client/IgfsClientClosureResponseType.java   |  53 -----
 .../igfs/client/IgfsClientManager.java          | 228 +++++++++++++++++++
 .../igfs/client/IgfsClientOutOperation.java     |  76 +++++++
 .../igfs/client/IgfsClientRequest.java          | 203 +++++++++++++++++
 .../igfs/client/IgfsClientResponse.java         | 133 +++++++++++
 .../igfs/client/IgfsClientResponseType.java     |  53 +++++
 15 files changed, 714 insertions(+), 669 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index e5d47c0..e073d41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -97,7 +97,7 @@ public enum GridTopic {
     TOPIC_TX,
 
     /** Topic to handle IGFS closures. */
-    TOPIC_IGFS_CLO;
+    TOPIC_IGFS_CLI;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b0f75dc..1097ad5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -120,8 +120,8 @@ import 
org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange;
 import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerRequest;
 import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage;
-import 
org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureRequest;
-import 
org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureResponse;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientRequest;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientResponse;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -163,12 +163,12 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
         switch (type) {
             case -28:
-                msg = new IgfsClientClosureResponse();
+                msg = new IgfsClientResponse();
 
                 break;
 
             case -27:
-                msg = new IgfsClientClosureRequest();
+                msg = new IgfsClientRequest();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 3f67156..ed9f302 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -24,7 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import 
org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureManager;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,7 +41,7 @@ public class IgfsContext {
     private List<IgfsManager> mgrs = new LinkedList<>();
 
     /** Closure manager. */
-    private final IgfsClientClosureManager cloMgr;
+    private final IgfsClientManager cloMgr;
 
     /** Meta manager. */
     private final IgfsMetaManager metaMgr;
@@ -73,7 +73,7 @@ public class IgfsContext {
     public IgfsContext(
         GridKernalContext ctx,
         FileSystemConfiguration cfg,
-        IgfsClientClosureManager cloMgr,
+        IgfsClientManager cloMgr,
         IgfsMetaManager metaMgr,
         IgfsDataManager dataMgr,
         IgfsServerManager srvMgr,
@@ -123,7 +123,7 @@ public class IgfsContext {
     /**
      * @return Closure manager.
      */
-    public IgfsClientClosureManager closure() {
+    public IgfsClientManager closure() {
         return cloMgr;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 7d6753d..58e0d48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -33,7 +33,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsJob;
 import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteNodeAttributes;
-import 
org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureManager;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -85,7 +85,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
         new ConcurrentHashMap8<>();
 
     /** Client closure manager. */
-    private final IgfsClientClosureManager cloMgr;
+    private final IgfsClientManager cloMgr;
 
     /**
      * @param ctx Kernal context.
@@ -93,7 +93,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
     public IgfsProcessor(GridKernalContext ctx) {
         super(ctx);
 
-        cloMgr = new IgfsClientClosureManager(ctx);
+        cloMgr = new IgfsClientManager(ctx);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
index a184f2e..92bdf30 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
@@ -136,6 +136,7 @@ public abstract class IgfsClientAbstractCallable<T> 
implements IgniteCallable<T>
     }
 
     /** {@inheritDoc} */
+    // TODO.
     @Override public final T call() throws Exception {
         assert ignite != null;
 
@@ -151,7 +152,14 @@ public abstract class IgfsClientAbstractCallable<T> 
implements IgniteCallable<T>
      * @return Result.
      * @throws Exception If failed.
      */
-    protected abstract T call0(IgfsContext ctx) throws Exception;
+    public abstract T call0(IgfsContext ctx) throws Exception;
+
+    /**
+     * @return IGFS name.
+     */
+    public String igfsName() {
+        return igfsName;
+    }
 
     /** {@inheritDoc} */
     @Override public final void writeBinary(BinaryWriter writer) throws 
BinaryObjectException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
deleted file mode 100644
index 4da47e0..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.client;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.igfs.IgfsContext;
-import org.apache.ignite.internal.processors.igfs.IgfsManager;
-import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-/**
- * Manager to handle IGFS client closures.
- */
-public class IgfsClientClosureManager extends IgfsManager {
-    /** Pending input operations received when manager is not started yet. */
-    private final ConcurrentLinkedDeque<IgfsClientClosureRequest> pending = 
new ConcurrentLinkedDeque<>();
-
-    /** Outgoing operations. */
-    private final Map<Long, IgfsClientClosureOutOperation> outOps = new 
ConcurrentHashMap<>();
-
-    /** Marshaller. */
-    private final Marshaller marsh;
-
-    /** Whether manager is fully started and ready to process requests. */
-    private volatile boolean ready;
-
-    /** Stopping flag. */
-    private volatile boolean stopping;
-
-    /** RW lock for synchronization. */
-    private final StripedCompositeReadWriteLock rwLock =
-        new 
StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2);
-
-    /**
-     * Constructor.
-     *
-     * @param ctx Kernal context.
-     */
-    public IgfsClientClosureManager(GridKernalContext ctx) {
-        super(ctx);
-
-        marsh = ctx.config().getMarshaller();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStart0() throws IgniteCheckedException {
-        // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onKernalStop0(boolean cancel) {
-        // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void stop0(boolean cancel) {
-        // TODO
-    }
-
-    /**
-     * Execute IGFS closure.
-     *
-     * @param igfsCtx IGFS context.
-     * @param clo Closure.
-     * @return Result.
-     */
-    public <T> T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> 
clo) throws IgniteCheckedException {
-        return executeAsync(igfsCtx, clo).get();
-    }
-
-    /**
-     * Execute IGFS closure asynchronously.
-     *
-     * @param igfsCtx IGFS context.
-     * @param clo Closure.
-     * @return Future.
-     */
-    public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, 
IgfsClientAbstractCallable<T> clo) {
-
-        // TODO
-
-        return null;
-    }
-
-    /**
-     * Create closure response.
-     *
-     * @param msgId Message ID.
-     * @param res Response.
-     * @param marsh Marshaller.
-     * @return Response.
-     */
-    private IgfsClientClosureResponse createResponse(long msgId, @Nullable 
Object res, @Nullable Throwable resErr,
-        Marshaller marsh) {
-        try {
-            if (resErr != null)
-                return new IgfsClientClosureResponse(msgId, 
IgfsClientClosureResponseType.ERR, null,
-                    marsh.marshal(resErr));
-            else {
-                if (res == null)
-                    return new IgfsClientClosureResponse(msgId, 
IgfsClientClosureResponseType.NULL, null, null);
-                else if (res instanceof Boolean)
-                    return new IgfsClientClosureResponse(msgId, 
IgfsClientClosureResponseType.BOOL, res, null);
-                else
-                    return new IgfsClientClosureResponse(msgId, 
IgfsClientClosureResponseType.OBJ, null,
-                        marsh.marshal(res));
-            }
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to marshal IGFS closure result [msgId=" + 
msgId + ", res=" + res +
-                ", resErr=" + resErr + ']', e);
-
-            return new IgfsClientClosureResponse(msgId, 
IgfsClientClosureResponseType.MARSH_ERR, null, null);
-        }
-    }
-
-    /**
-     * Handle node leave event.
-     *
-     * @param nodeId Node ID.
-     */
-    private void onNodeLeft(UUID nodeId) {
-        // TODO
-    }
-
-    /**
-     * Handle closure request.
-     *
-     * @param req Request.
-     */
-    private void onClosureRequest(IgfsClientClosureRequest req) {
-        rwLock.readLock().lock();
-
-        try {
-            if (stopping)
-                return; // Discovery listener on remote node will handle node 
leave.
-
-            if (ready)
-                onClosureRequest0(req); // Normal execution flow.
-            else
-                pending.add(req); // Add to pending set if manager is not 
fully started yet.
-        }
-        finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Actual request processing. Happens inside appropriate thread pool.
-     *
-     * @param req Request.
-     */
-    private void onClosureRequest0(IgfsClientClosureRequest req) {
-        // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsClientClosureManager.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java
deleted file mode 100644
index 2eb15a6..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.client;
-
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.UUID;
-
-/**
- * IGFS client closure outgoing opeartion descriptor.
- */
-public class IgfsClientClosureOutOperation {
-    /** Target node ID. */
-    private final UUID nodeId;
-
-    /** Target operation. */
-    private final IgfsClientAbstractCallable target;
-
-    /** Future completed when operation is ready. */
-    private final IgniteInternalFuture fut;
-
-    /**
-     * Constructor.
-     *
-     * @param nodeId Target node ID.
-     * @param target Target operation.
-     * @param fut Future completed when operation is ready.
-     */
-    public IgfsClientClosureOutOperation(UUID nodeId, 
IgfsClientAbstractCallable target, IgniteInternalFuture fut) {
-        this.nodeId = nodeId;
-        this.target = target;
-        this.fut = fut;
-    }
-
-    /**
-     * @return Target node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @return Target operation.
-     */
-    public IgfsClientAbstractCallable target() {
-        return target;
-    }
-
-    /**
-     * @return Future completed when operation is ready.
-     */
-    public IgniteInternalFuture future() {
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsClientClosureOutOperation.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java
deleted file mode 100644
index 1eda024..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.client;
-
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-/**
- * IGFS client closure execute request.
- */
-public class IgfsClientClosureRequest implements Message {
-    /** Base fields (all except of target) count. */
-    private static final byte BASE_FIELDS_CNT = 3;
-
-    /** Originating node ID. */
-    private UUID nodeId;
-
-    /** Message ID. */
-    private long msgId;
-
-    /** Target callable. */
-    @GridToStringInclude
-    private IgfsClientAbstractCallable target;
-
-    /**
-     * Default constructor.
-     */
-    public IgfsClientClosureRequest() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param nodeId Originating node ID.
-     * @param msgId Message ID.
-     * @param target Target callable.
-     */
-    public IgfsClientClosureRequest(UUID nodeId, long msgId, 
IgfsClientAbstractCallable target) {
-        assert nodeId != null;
-        assert target != null;
-
-        this.nodeId = nodeId;
-        this.msgId = msgId;
-        this.target = target;
-    }
-
-    /**
-     * @return Node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long messageId() {
-        return msgId;
-    }
-
-    /**
-     * @return Target callable.
-     */
-    public IgfsClientAbstractCallable target() {
-        return target;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return -27;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return (byte)(BASE_FIELDS_CNT + target.fieldsCount());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        byte fieldsCount = fieldsCount();
-
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeUuid("nodeId", nodeId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("msgId", msgId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeShort("typeId", target.typeId()))
-                    return false;
-
-                writer.incrementState();
-
-            default:
-                while (writer.state() < fieldsCount) {
-                    if (!target.writeTo(writer, writer.state() - 
BASE_FIELDS_CNT))
-                        return false;
-
-                    writer.incrementState();
-                }
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                nodeId = reader.readUuid("nodeId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                msgId = reader.readLong("msgId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                short typeId;
-
-                typeId = reader.readShort("typeId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                target = IgfsClientAbstractCallable.callableForTypeId(typeId);
-
-                reader.incrementState();
-
-            default:
-                while (reader.state() < fieldsCount()) {
-                    target.readFrom(reader, reader.state() - BASE_FIELDS_CNT);
-
-                    if (!reader.isLastRead())
-                        return false;
-
-                    reader.incrementState();
-                }
-        }
-
-        return reader.afterMessageRead(IgfsClientClosureRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsClientClosureRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
deleted file mode 100644
index 40e2802..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.client;
-
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-import java.nio.ByteBuffer;
-
-/**
- * IGFS client closure execute response.
- */
-public class IgfsClientClosureResponse implements Message {
-    /** Message ID. */
-    private long msgId;
-
-    /** Response type. */
-    private IgfsClientClosureResponseType typ;
-
-    /** Result. */
-    @GridToStringInclude
-    private Object res;
-
-    /** Result bytes. */
-    @GridToStringExclude
-    private byte[] resBytes;
-
-    /**
-     * Default constructor.
-     */
-    public IgfsClientClosureResponse() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param msgId Message ID.
-     * @param typ Type.
-     * @param res Result.
-     * @param resBytes Result bytes.
-     */
-    public IgfsClientClosureResponse(long msgId, IgfsClientClosureResponseType 
typ, @Nullable Object res,
-        @Nullable byte[] resBytes) {
-        this.msgId = msgId;
-        this.typ = typ;
-        this.res = res;
-        this.resBytes = resBytes;
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long messageId() {
-        return msgId;
-    }
-
-    /**
-     * @return Type.
-     */
-    public IgfsClientClosureResponseType type() {
-        return typ;
-    }
-
-    /**
-     * @return Result.
-     */
-    @Nullable public Object result() {
-        return res;
-    }
-
-    /**
-     * @return Result bytes.
-     */
-    @Nullable public byte[] resultBytes() {
-        return resBytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return -28;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return (byte)(typ == IgfsClientClosureResponseType.NULL || typ == 
IgfsClientClosureResponseType.MARSH_ERR ?
-            2 : 3);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        // TODO
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        // TODO
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsClientClosureResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java
deleted file mode 100644
index 8fd45c9..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.client;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * IGFS client closure response type.
- */
-public enum IgfsClientClosureResponseType {
-    /** Object. */
-    OBJ,
-
-    /** Null result. */
-    NULL,
-
-    /** Boolean result. */
-    BOOL,
-
-    /** Error. */
-    ERR,
-
-    /** Marshalling error. */
-    MARSH_ERR;
-
-    /** Enum values. */
-    private static final IgfsClientClosureResponseType[] VALS = values();
-
-    /**
-     * Efficiently gets enumerated value from its ordinal.
-     *
-     * @param ord Ordinal value.
-     * @return Enumerated value.
-     */
-    @Nullable public static IgfsClientClosureResponseType fromOrdinal(int ord) 
{
-        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
new file mode 100644
index 0000000..59297bf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
+import org.apache.ignite.internal.processors.igfs.IgfsImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsManager;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+/**
+ * Manager to handle IGFS client closures.
+ */
+public class IgfsClientManager extends IgfsManager {
+    /** Pending input operations received when manager is not started yet. */
+    private final ConcurrentLinkedDeque<IgfsClientRequest> pending = new 
ConcurrentLinkedDeque<>();
+
+    /** Outgoing operations. */
+    private final Map<Long, IgfsClientOutOperation> outOps = new 
ConcurrentHashMap<>();
+
+    /** Marshaller. */
+    private final Marshaller marsh;
+
+    /** Whether manager is fully started and ready to process requests. */
+    private volatile boolean ready;
+
+    /** Stopping flag. */
+    private volatile boolean stopping;
+
+    /** RW lock for synchronization. */
+    private final StripedCompositeReadWriteLock rwLock =
+        new 
StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2);
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public IgfsClientManager(GridKernalContext ctx) {
+        super(ctx);
+
+        marsh = ctx.config().getMarshaller();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        // TODO
+    }
+
+    /**
+     * Execute IGFS closure.
+     *
+     * @param igfsCtx IGFS context.
+     * @param clo Closure.
+     * @return Result.
+     */
+    public <T> T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> 
clo) throws IgniteCheckedException {
+        return executeAsync(igfsCtx, clo).get();
+    }
+
+    /**
+     * Execute IGFS closure asynchronously.
+     *
+     * @param igfsCtx IGFS context.
+     * @param clo Closure.
+     * @return Future.
+     */
+    public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, 
IgfsClientAbstractCallable<T> clo) {
+
+        // TODO
+
+        return null;
+    }
+
+    /**
+     * Create closure response.
+     *
+     * @param msgId Message ID.
+     * @param res Response.
+     * @return Response.
+     */
+    private IgfsClientResponse createResponse(long msgId, @Nullable Object 
res, @Nullable Throwable resErr) {
+        try {
+            if (resErr != null)
+                return new IgfsClientResponse(msgId, 
IgfsClientResponseType.ERR, null,
+                    marsh.marshal(resErr));
+            else {
+                if (res == null)
+                    return new IgfsClientResponse(msgId, 
IgfsClientResponseType.NULL, null, null);
+                else if (res instanceof Boolean)
+                    return new IgfsClientResponse(msgId, 
IgfsClientResponseType.BOOL, res, null);
+                else
+                    return new IgfsClientResponse(msgId, 
IgfsClientResponseType.OBJ, null,
+                        marsh.marshal(res));
+            }
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to marshal IGFS closure result [msgId=" + 
msgId + ", res=" + res +
+                ", resErr=" + resErr + ']', e);
+
+            return new IgfsClientResponse(msgId, 
IgfsClientResponseType.MARSH_ERR, null, null);
+        }
+    }
+
+    /**
+     * Handle node leave event.
+     *
+     * @param nodeId Node ID.
+     */
+    private void onNodeLeft(UUID nodeId) {
+        // TODO
+    }
+
+    /**
+     * Handle request.
+     *
+     * @param req Request.
+     */
+    private void onRequest(IgfsClientRequest req) {
+        rwLock.readLock().lock();
+
+        try {
+            if (stopping)
+                return; // Discovery listener on remote node will handle node 
leave.
+
+            if (ready)
+                processRequest(req); // Normal execution flow.
+            else
+                pending.add(req); // Add to pending set if manager is not 
fully started yet.
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Handle response.
+     *
+     * @param resp Response.
+     */
+    private void onResponse(IgfsClientResponse resp) {
+        // TODO.
+    }
+
+    /**
+     * Actual request processing. Happens inside appropriate thread pool.
+     *
+     * @param req Request.
+     */
+    private void processRequest(IgfsClientRequest req) {
+        IgfsClientResponse resp;
+
+        try {
+            IgfsClientAbstractCallable target = req.target();
+
+            IgfsImpl igfs = (IgfsImpl) ctx.igfs().igfs(target.igfsName());
+
+            if (igfs == null)
+                throw new IgfsException("IGFS with the given name is not 
configured on the node: " + target.igfsName());
+
+            Object res = target.call0(igfs.context());
+
+            resp = createResponse(req.messageId(), res, null);
+        }
+        catch (Exception e) {
+            // Wrap exception.
+            resp = createResponse(req.messageId(), null, e);
+        }
+
+        // Send response.
+        try {
+            ctx.io().send(req.nodeId(), GridTopic.TOPIC_IGFS_CLI, resp, 
GridIoPolicy.PUBLIC_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send IGFS client response [nodeId=" + 
req.nodeId() +
+                ", msgId=" + req.messageId() + ']', e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsClientManager.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
new file mode 100644
index 0000000..9e1c3ec
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * IGFS client closure outgoing opeartion descriptor.
+ */
+public class IgfsClientOutOperation {
+    /** Target node ID. */
+    private final UUID nodeId;
+
+    /** Target operation. */
+    private final IgfsClientAbstractCallable target;
+
+    /** Future completed when operation is ready. */
+    private final IgniteInternalFuture fut;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeId Target node ID.
+     * @param target Target operation.
+     * @param fut Future completed when operation is ready.
+     */
+    public IgfsClientOutOperation(UUID nodeId, IgfsClientAbstractCallable 
target, IgniteInternalFuture fut) {
+        this.nodeId = nodeId;
+        this.target = target;
+        this.fut = fut;
+    }
+
+    /**
+     * @return Target node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Target operation.
+     */
+    public IgfsClientAbstractCallable target() {
+        return target;
+    }
+
+    /**
+     * @return Future completed when operation is ready.
+     */
+    public IgniteInternalFuture future() {
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsClientOutOperation.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
new file mode 100644
index 0000000..49fe523
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * IGFS client closure execute request.
+ */
+public class IgfsClientRequest implements Message {
+    /** Base fields (all except of target) count. */
+    private static final byte BASE_FIELDS_CNT = 3;
+
+    /** Originating node ID. */
+    private UUID nodeId;
+
+    /** Message ID. */
+    private long msgId;
+
+    /** Target callable. */
+    @GridToStringInclude
+    private IgfsClientAbstractCallable target;
+
+    /**
+     * Default constructor.
+     */
+    public IgfsClientRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param nodeId Originating node ID.
+     * @param msgId Message ID.
+     * @param target Target callable.
+     */
+    public IgfsClientRequest(UUID nodeId, long msgId, 
IgfsClientAbstractCallable target) {
+        assert nodeId != null;
+        assert target != null;
+
+        this.nodeId = nodeId;
+        this.msgId = msgId;
+        this.target = target;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Message ID.
+     */
+    public long messageId() {
+        return msgId;
+    }
+
+    /**
+     * @return Target callable.
+     */
+    public IgfsClientAbstractCallable target() {
+        return target;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -27;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return (byte)(BASE_FIELDS_CNT + target.fieldsCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        byte fieldsCount = fieldsCount();
+
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeUuid("nodeId", nodeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("msgId", msgId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeShort("typeId", target.typeId()))
+                    return false;
+
+                writer.incrementState();
+
+            default:
+                while (writer.state() < fieldsCount) {
+                    if (!target.writeTo(writer, writer.state() - 
BASE_FIELDS_CNT))
+                        return false;
+
+                    writer.incrementState();
+                }
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                nodeId = reader.readUuid("nodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                msgId = reader.readLong("msgId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                short typeId;
+
+                typeId = reader.readShort("typeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                target = IgfsClientAbstractCallable.callableForTypeId(typeId);
+
+                reader.incrementState();
+
+            default:
+                while (reader.state() < fieldsCount()) {
+                    target.readFrom(reader, reader.state() - BASE_FIELDS_CNT);
+
+                    if (!reader.isLastRead())
+                        return false;
+
+                    reader.incrementState();
+                }
+        }
+
+        return reader.afterMessageRead(IgfsClientRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsClientRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java
new file mode 100644
index 0000000..2fd500f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+import java.nio.ByteBuffer;
+
+/**
+ * IGFS client closure execute response.
+ */
+public class IgfsClientResponse implements Message {
+    /** Message ID. */
+    private long msgId;
+
+    /** Response type. */
+    private IgfsClientResponseType typ;
+
+    /** Result. */
+    @GridToStringInclude
+    private Object res;
+
+    /** Result bytes. */
+    @GridToStringExclude
+    private byte[] resBytes;
+
+    /**
+     * Default constructor.
+     */
+    public IgfsClientResponse() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param msgId Message ID.
+     * @param typ Type.
+     * @param res Result.
+     * @param resBytes Result bytes.
+     */
+    public IgfsClientResponse(long msgId, IgfsClientResponseType typ, 
@Nullable Object res,
+        @Nullable byte[] resBytes) {
+        this.msgId = msgId;
+        this.typ = typ;
+        this.res = res;
+        this.resBytes = resBytes;
+    }
+
+    /**
+     * @return Message ID.
+     */
+    public long messageId() {
+        return msgId;
+    }
+
+    /**
+     * @return Type.
+     */
+    public IgfsClientResponseType type() {
+        return typ;
+    }
+
+    /**
+     * @return Result.
+     */
+    @Nullable public Object result() {
+        return res;
+    }
+
+    /**
+     * @return Result bytes.
+     */
+    @Nullable public byte[] resultBytes() {
+        return resBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -28;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return (byte)(typ == IgfsClientResponseType.NULL || typ == 
IgfsClientResponseType.MARSH_ERR ?
+            2 : 3);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        // TODO
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        // TODO
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsClientResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java
new file mode 100644
index 0000000..458b5be
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IGFS client closure response type.
+ */
+public enum IgfsClientResponseType {
+    /** Object. */
+    OBJ,
+
+    /** Null result. */
+    NULL,
+
+    /** Boolean result. */
+    BOOL,
+
+    /** Error. */
+    ERR,
+
+    /** Marshalling error. */
+    MARSH_ERR;
+
+    /** Enum values. */
+    private static final IgfsClientResponseType[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value.
+     */
+    @Nullable public static IgfsClientResponseType fromOrdinal(int ord) {
+        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+    }
+}

Reply via email to