Added response processing logic.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16d42837 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16d42837 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16d42837 Branch: refs/heads/ignite-3553 Commit: 16d42837caf3602805fc3f64ab24782ab7e675e1 Parents: b5322e8 Author: vozerov-gridgain <[email protected]> Authored: Wed Jul 27 15:51:56 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jul 27 15:51:56 2016 +0300 ---------------------------------------------------------------------- .../igfs/client/IgfsClientManager.java | 87 ++++++++++++++++++-- .../igfs/client/IgfsClientOutOperation.java | 7 +- 2 files changed, 86 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/16d42837/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java index 59297bf..9504cf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.igfs.IgfsContext; import org.apache.ignite.internal.processors.igfs.IgfsImpl; import org.apache.ignite.internal.processors.igfs.IgfsManager; @@ -60,6 +61,9 @@ public class IgfsClientManager extends IgfsManager { private final StripedCompositeReadWriteLock rwLock = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2); + /** IO message listener. */ + private final MessageListener msgLsnr = new MessageListener(); + /** * Constructor. * @@ -73,22 +77,28 @@ public class IgfsClientManager extends IgfsManager { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - // TODO + ctx.io().addMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr); + + // TODO: Discovery listener. } /** {@inheritDoc} */ @Override protected void onKernalStart0() throws IgniteCheckedException { - // TODO + // TODO: Set ready flag. } /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { - // TODO + ctx.io().removeMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr); + + // TODO: Discovery listener. + + // TODO: Set stopping flag } /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { - // TODO + // TODO: Cleanup everything. } /** @@ -182,8 +192,57 @@ public class IgfsClientManager extends IgfsManager { * * @param resp Response. */ + @SuppressWarnings("unchecked") private void onResponse(IgfsClientResponse resp) { - // TODO. + rwLock.readLock().lock(); + + try { + IgfsClientOutOperation op = outOps.remove(resp.messageId()); + + // Op might be null in case of concurreny local node stop or remote node stop.= discovery notification. + if (op != null) { + // Restore result. + Object res = null; + Throwable err = null; + + try { + switch (resp.type()) { + case BOOL: + res = resp.result(); + + break; + + case OBJ: + res = marsh.unmarshal(resp.resultBytes(), U.resolveClassLoader(ctx.config())); + + break; + + case ERR: + err = marsh.unmarshal(resp.resultBytes(), U.resolveClassLoader(ctx.config())); + + break; + + case MARSH_ERR: + err = new IgfsException("Failed to marshal IGFS task result on remote node " + + "(see remote node logs for more information) [nodeId + " + op.nodeId() + ']'); + + break; + + default: + assert resp.type() == IgfsClientResponseType.NULL; + } + } + catch (Exception e) { + // Something went wrong during unmarshalling. + err = new IgfsException("Failed to unmarshal IGFS task result." , e); + } + + op.future().onDone(res, err); + } + } + finally { + rwLock.readLock().unlock(); + } } /** @@ -225,4 +284,22 @@ public class IgfsClientManager extends IgfsManager { @Override public String toString() { return S.toString(IgfsClientManager.class, this); } + + /** + * Handles job execution requests. + */ + private class MessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + assert nodeId != null; + assert msg != null; + + if (msg instanceof IgfsClientRequest) + onRequest((IgfsClientRequest)msg); + else if (msg instanceof IgfsClientResponse) + onResponse((IgfsClientResponse)msg); + else + U.error(log, "IGFS client message listener received unknown message: " + msg); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/16d42837/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java index 9e1c3ec..03c066b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.igfs.client; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.S; import java.util.UUID; @@ -33,7 +34,7 @@ public class IgfsClientOutOperation { private final IgfsClientAbstractCallable target; /** Future completed when operation is ready. */ - private final IgniteInternalFuture fut; + private final GridFutureAdapter fut; /** * Constructor. @@ -42,7 +43,7 @@ public class IgfsClientOutOperation { * @param target Target operation. * @param fut Future completed when operation is ready. */ - public IgfsClientOutOperation(UUID nodeId, IgfsClientAbstractCallable target, IgniteInternalFuture fut) { + public IgfsClientOutOperation(UUID nodeId, IgfsClientAbstractCallable target, GridFutureAdapter fut) { this.nodeId = nodeId; this.target = target; this.fut = fut; @@ -65,7 +66,7 @@ public class IgfsClientOutOperation { /** * @return Future completed when operation is ready. */ - public IgniteInternalFuture future() { + public GridFutureAdapter future() { return fut; }
