Wired up pending worker.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a28f0df Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a28f0df Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a28f0df Branch: refs/heads/ignite-3553 Commit: 9a28f0df79b8e63f0fecf0c2fb83f734f6fc2996 Parents: 5306312 Author: vozerov-gridgain <[email protected]> Authored: Wed Jul 27 16:16:27 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jul 27 16:16:27 2016 +0300 ---------------------------------------------------------------------- .../igfs/client/IgfsClientManager.java | 61 +++++++++++++++++--- 1 file changed, 52 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9a28f0df/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java index 49246ea..65584fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.processors.igfs.client; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -33,7 +35,9 @@ import org.apache.ignite.internal.processors.igfs.IgfsManager; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import java.util.Map; @@ -49,9 +53,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; * Manager to handle IGFS client closures. */ public class IgfsClientManager extends IgfsManager { - /** Pending input operations received when manager is not started yet. */ - private final ConcurrentLinkedDeque<IgfsClientInOperation> pending = new ConcurrentLinkedDeque<>(); - /** Outgoing operations. */ private final Map<Long, IgfsClientOutOperation> outOps = new ConcurrentHashMap<>(); @@ -74,6 +75,15 @@ public class IgfsClientManager extends IgfsManager { /** IO message listener. */ private final MessageListener msgLsnr = new MessageListener(); + /** Pending input operations received when manager is not started yet. */ + private final ConcurrentLinkedDeque<IgfsClientInOperation> pending = new ConcurrentLinkedDeque<>(); + + /** Worker to process pending requests. */ + private final PendingRequestsWorker pendingWorker = new PendingRequestsWorker(); + + /** Whether pending requests worker started. */ + private boolean pendingWorkerStarted; + /** * Constructor. * @@ -100,7 +110,9 @@ public class IgfsClientManager extends IgfsManager { ready = true; if (!pending.isEmpty()) { - // TODO: Start separate thread. + new IgniteThread(pendingWorker).start(); + + pendingWorkerStarted = true; } } finally { @@ -114,17 +126,23 @@ public class IgfsClientManager extends IgfsManager { ctx.io().removeMessageListener(GridTopic.TOPIC_IGFS_CLI, msgLsnr); + boolean pendingWorkerStarted0; + rwLock.writeLock().lock(); try { stopping = true; + + pendingWorkerStarted0 = pendingWorkerStarted; } finally { rwLock.writeLock().unlock(); } - // Stop pending worker (if any). - // TODO + if (pendingWorkerStarted0) { + U.cancel(pendingWorker); + U.join(pendingWorker, log); + } } /** {@inheritDoc} */ @@ -214,7 +232,7 @@ public class IgfsClientManager extends IgfsManager { processRequest(nodeId, req); // Normal execution flow. else // Add to pending set if manager is not operational yet. - pending.add(new IgfsClientInOperation(nodeId, req)); + pending.addLast(new IgfsClientInOperation(nodeId, req)); } finally { rwLock.readLock().unlock(); @@ -321,7 +339,7 @@ public class IgfsClientManager extends IgfsManager { } /** - * Handles job execution requests. + * Message listener. */ private class MessageListener implements GridMessageListener { /** {@inheritDoc} */ @@ -347,7 +365,7 @@ public class IgfsClientManager extends IgfsManager { switch (evt.type()) { case EVT_NODE_LEFT: case EVT_NODE_FAILED: - DiscoveryEvent evt0 = (DiscoveryEvent)evt; + DiscoveryEvent evt0 = (DiscoveryEvent) evt; onNodeLeft(evt0.eventNode().id()); @@ -355,6 +373,31 @@ public class IgfsClientManager extends IgfsManager { default: assert false : "Unknown event: " + evt; + } + } + } + + /** + * Pending requests worker. + */ + private class PendingRequestsWorker extends GridWorker { + /** + * Consturctor. + * + * @param gridName Grid name. + * @param name WOrker name. + * @param log Logger. + */ + public PendingRequestsWorker(@Nullable String gridName, String name, IgniteLogger log) { + super(gridName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + IgfsClientInOperation inOp; + + while ((inOp = pending.pollFirst()) != null) + processRequest(inOp.nodeId(), inOp.request()); } } }
