Added response processing logic.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16d42837
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16d42837
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16d42837

Branch: refs/heads/ignite-3553
Commit: 16d42837caf3602805fc3f64ab24782ab7e675e1
Parents: b5322e8
Author: vozerov-gridgain <[email protected]>
Authored: Wed Jul 27 15:51:56 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Wed Jul 27 15:51:56 2016 +0300

----------------------------------------------------------------------
 .../igfs/client/IgfsClientManager.java          | 87 ++++++++++++++++++--
 .../igfs/client/IgfsClientOutOperation.java     |  7 +-
 2 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16d42837/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
index 59297bf..9504cf4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.igfs.IgfsContext;
 import org.apache.ignite.internal.processors.igfs.IgfsImpl;
 import org.apache.ignite.internal.processors.igfs.IgfsManager;
@@ -60,6 +61,9 @@ public class IgfsClientManager extends IgfsManager {
     private final StripedCompositeReadWriteLock rwLock =
         new 
StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2);
 
+    /** IO message listener. */
+    private final MessageListener msgLsnr = new MessageListener();
+
     /**
      * Constructor.
      *
@@ -73,22 +77,28 @@ public class IgfsClientManager extends IgfsManager {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        // TODO
+        ctx.io().addMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr);
+
+        // TODO: Discovery listener.
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStart0() throws IgniteCheckedException {
-        // TODO
+        // TODO: Set ready flag.
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
-        // TODO
+        ctx.io().removeMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr);
+
+        // TODO: Discovery listener.
+
+        // TODO: Set stopping flag
     }
 
     /** {@inheritDoc} */
     @Override protected void stop0(boolean cancel) {
-        // TODO
+        // TODO: Cleanup everything.
     }
 
     /**
@@ -182,8 +192,57 @@ public class IgfsClientManager extends IgfsManager {
      *
      * @param resp Response.
      */
+    @SuppressWarnings("unchecked")
     private void onResponse(IgfsClientResponse resp) {
-        // TODO.
+        rwLock.readLock().lock();
+
+        try {
+            IgfsClientOutOperation op = outOps.remove(resp.messageId());
+
+            // Op might be null in case of concurreny local node stop or 
remote node stop.= discovery notification.
+            if (op != null) {
+                // Restore result.
+                Object res = null;
+                Throwable err = null;
+
+                try {
+                    switch (resp.type()) {
+                        case BOOL:
+                            res = resp.result();
+
+                            break;
+
+                        case OBJ:
+                            res = marsh.unmarshal(resp.resultBytes(), 
U.resolveClassLoader(ctx.config()));
+
+                            break;
+
+                        case ERR:
+                            err = marsh.unmarshal(resp.resultBytes(), 
U.resolveClassLoader(ctx.config()));
+
+                            break;
+
+                        case MARSH_ERR:
+                            err = new IgfsException("Failed to marshal IGFS 
task result on remote node " +
+                                "(see remote node logs for more information) 
[nodeId + " + op.nodeId() + ']');
+
+                            break;
+
+                        default:
+                            assert resp.type() == IgfsClientResponseType.NULL;
+                    }
+                }
+                catch (Exception e) {
+                    // Something went wrong during unmarshalling.
+                    err = new IgfsException("Failed to unmarshal IGFS task 
result." , e);
+                }
+
+                op.future().onDone(res, err);
+            }
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
     }
 
     /**
@@ -225,4 +284,22 @@ public class IgfsClientManager extends IgfsManager {
     @Override public String toString() {
         return S.toString(IgfsClientManager.class, this);
     }
+
+    /**
+     * Handles job execution requests.
+     */
+    private class MessageListener implements GridMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            assert nodeId != null;
+            assert msg != null;
+
+            if (msg instanceof IgfsClientRequest)
+                onRequest((IgfsClientRequest)msg);
+            else if (msg instanceof IgfsClientResponse)
+                onResponse((IgfsClientResponse)msg);
+            else
+                U.error(log, "IGFS client message listener received unknown 
message: " + msg);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16d42837/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
index 9e1c3ec..03c066b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.igfs.client;
 
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 import java.util.UUID;
@@ -33,7 +34,7 @@ public class IgfsClientOutOperation {
     private final IgfsClientAbstractCallable target;
 
     /** Future completed when operation is ready. */
-    private final IgniteInternalFuture fut;
+    private final GridFutureAdapter fut;
 
     /**
      * Constructor.
@@ -42,7 +43,7 @@ public class IgfsClientOutOperation {
      * @param target Target operation.
      * @param fut Future completed when operation is ready.
      */
-    public IgfsClientOutOperation(UUID nodeId, IgfsClientAbstractCallable 
target, IgniteInternalFuture fut) {
+    public IgfsClientOutOperation(UUID nodeId, IgfsClientAbstractCallable 
target, GridFutureAdapter fut) {
         this.nodeId = nodeId;
         this.target = target;
         this.fut = fut;
@@ -65,7 +66,7 @@ public class IgfsClientOutOperation {
     /**
      * @return Future completed when operation is ready.
      */
-    public IgniteInternalFuture future() {
+    public GridFutureAdapter future() {
         return fut;
     }
 

Reply via email to