# IGNITE-737. Rework Visor code to support ClusterGroup.forXXX(cacheName) on daemon node.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2dd74cd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2dd74cd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2dd74cd8 Branch: refs/heads/ignite-sprint-4 Commit: 2dd74cd8a6a8d7bfca12025ff030fd2f7d690b9c Parents: 925c45a Author: AKuznetsov <akuznet...@gridgain.com> Authored: Thu Apr 16 15:36:20 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Thu Apr 16 15:36:20 2015 +0700 ---------------------------------------------------------------------- .../internal/visor/VisorTaskArgument.java | 2 ++ .../internal/visor/query/VisorQueryArg.java | 22 +++++++------ .../internal/visor/query/VisorQueryJob.java | 34 ++++++++++++++++++++ .../internal/visor/query/VisorQueryTask.java | 19 ++++++----- .../commands/cache/VisorCacheClearCommand.scala | 4 +-- .../commands/cache/VisorCacheCommand.scala | 2 +- .../commands/cache/VisorCacheScanCommand.scala | 4 +-- .../commands/cache/VisorCacheSwapCommand.scala | 4 +-- .../scala/org/apache/ignite/visor/visor.scala | 19 +++++++++++ 9 files changed, 85 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java index 1a4e498..e029678 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.visor; +import org.apache.ignite.cluster.*; + import java.io.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java index 38fac1f..2466868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.visor.query; +import org.jetbrains.annotations.*; + import java.io.*; import java.util.*; @@ -27,8 +29,8 @@ public class VisorQueryArg implements Serializable { /** */ private static final long serialVersionUID = 0L; - /** Node ID in case of local cache. */ - private final UUID locCacheNodeId; + /** Optional node ID. */ + private final UUID nid; /** Cache name for query. */ private final String cacheName; @@ -37,26 +39,26 @@ public class VisorQueryArg implements Serializable { private final String qryTxt; /** Result batch size. */ - private final Integer pageSize; + private final int pageSize; /** - * @param locCacheNodeId Node ID in case of local cache or {@code null} otherwise. + * @param nid Optional node ID with cache. * @param cacheName Cache name for query. * @param qryTxt Query text. * @param pageSize Result batch size. */ - public VisorQueryArg(UUID locCacheNodeId, String cacheName, String qryTxt, Integer pageSize) { - this.locCacheNodeId = locCacheNodeId; + public VisorQueryArg(@Nullable UUID nid, String cacheName, String qryTxt, int pageSize) { + this.nid = nid; this.cacheName = cacheName; this.qryTxt = qryTxt; this.pageSize = pageSize; } /** - * @return Node ID in case of local cache or {@code null} otherwise. + * @return Optional node ID. */ - public UUID localCacheNodeId() { - return locCacheNodeId; + @Nullable public UUID nodeId() { + return nid; } /** @@ -76,7 +78,7 @@ public class VisorQueryArg implements Serializable { /** * @return Page size. */ - public Integer pageSize() { + public int pageSize() { return pageSize; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java index dcc2242..f5a2746 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.visor.query; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.timeout.*; @@ -32,6 +33,7 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.internal.visor.query.VisorQueryUtils.*; +import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; /** * Job for execute SCAN or SQL query and get first page of results. @@ -60,9 +62,41 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten return cacheProcessor.jcache(cacheName); } + /** + * @return Query task class name. + */ + protected Class<? extends VisorQueryTask> task() { + return VisorQueryTask.class; + } + /** {@inheritDoc} */ @Override protected IgniteBiTuple<? extends Exception, VisorQueryResultEx> run(VisorQueryArg arg) { try { + String cacheName = arg.cacheName(); + + UUID nid = ignite.localNode().id(); + + // If node was not specified then we need to check if this node could be used for query + // or we need to send task to appropriate node. + if (arg.nodeId() == null) { + ClusterGroup prj = ignite.cluster().forDataNodes(cacheName); + + if (prj.node() == null) + throw new IgniteException("No data nodes for cache: " + escapeName(cacheName)); + + // Current node does not fit. + if (prj.node(nid) == null) { + Collection<ClusterNode> prjNodes = prj.nodes(); + + Collection<UUID> nids = new ArrayList<>(prjNodes.size()); + + for (ClusterNode node : prjNodes) + nids.add(node.id()); + + return ignite.compute(prj).withNoFailover().execute(task(), new VisorTaskArgument<>(nids, arg, false)); + } + } + boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN"); String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java index 6683205..2ce011e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.visor.query; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.visor.*; import org.apache.ignite.lang.*; @@ -39,17 +40,19 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, IgniteBiTupl /** {@inheritDoc} */ @Override protected Map<? extends ComputeJob, ClusterNode> map0(List<ClusterNode> subgrid, VisorTaskArgument<VisorQueryArg> arg) { - String cacheName = taskArg.cacheName(); + String cache = taskArg.cacheName(); ClusterNode node; - if (taskArg.localCacheNodeId() == null) { - ClusterGroup prj = (ignite.cluster().localNode().isDaemon()) - ? ignite.cluster().forRemotes() - : ignite.cluster().forDataNodes(cacheName); + UUID nid = taskArg.nodeId(); + + IgniteClusterEx cluster = ignite.cluster(); + + if (nid == null) { + ClusterGroup prj = cluster.localNode().isDaemon() ? cluster.forRemotes() : cluster.forDataNodes(cache); if (prj.nodes().isEmpty()) - throw new IgniteException("No data nodes for cache: " + escapeName(cacheName)); + throw new IgniteException("No data nodes for cache: " + escapeName(cache)); // First try to take local node to avoid network hop. node = prj.node(ignite.localNode().id()); @@ -59,10 +62,10 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, IgniteBiTupl node = prj.forRandom().node(); } else { - node = ignite.cluster().node(taskArg.localCacheNodeId()); + node = cluster.node(nid); if (node == null) - throw new IgniteException("No data node for local cache: " + escapeName(cacheName)); + throw new IgniteException("Node not found: " + nid); } assert node != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala index ebdaa34..f401d15 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala @@ -100,10 +100,10 @@ class VisorCacheClearCommand { case Some(name) => name } - val prj = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_)) + val prj = projectionForNode(node) if (prj.nodes().isEmpty) - scold(node.fold("Topology is empty.")(n => "Can't find node with specified id: " + n.id())).^^ + scold(messageNodeNotFound(node)).^^ val t = VisorTextTable() http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala index f45597e..e74cb2c 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala @@ -495,7 +495,7 @@ class VisorCacheCommand { assert(node != null) try { - val prj = node.fold(ignite.cluster.forRemotes())(ignite.cluster.forNode(_)) + val prj = projectionForNode(node) val nids = prj.nodes().map(_.id()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala index 6c2da03..e9c6f6c 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala @@ -136,10 +136,10 @@ class VisorCacheScanCommand { case Some(name) => name } - val n = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_)).node() + val n = projectionForNode(node).node() if (n == null) { - scold(node.fold("Topology is empty.")(n => "Can't find node with specified id: " + n.id())).^^ + scold(messageNodeNotFound(node)).^^ return } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala index 0a20166..5589f8c 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala @@ -104,7 +104,7 @@ class VisorCacheSwapCommand { } - val prj = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_)) + val prj = projectionForNode(node) if (prj.nodes().isEmpty) { val msg = @@ -113,7 +113,7 @@ class VisorCacheSwapCommand { else "Can't find nodes with specified cache: " + cacheName - scold(msg).^^ + scold(messageNodeNotFound(node, Some(msg))).^^ } val t = VisorTextTable() http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala index 431701a..72f8a32 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala @@ -255,6 +255,25 @@ object visor extends VisorTag { } } + /** + * @param node Optional node. + * @return Projection with specified node or projection with random node if specified node is `None`. + */ + def projectionForNode(node: Option[ClusterNode]): ClusterGroup = node match { + case Some(n) => ignite.cluster.forNode(n) + case None => ignite.cluster.forRandom() + } + + /** + * @param node Node. + * @param msg Optional message. + * @return Message about why node was not found. + */ + def messageNodeNotFound(node: Option[ClusterNode], msg: Option[String] = None): String = node match { + case Some(n) => msg.getOrElse("Can't find node with specified id: " + n.id()) + case None => "Topology is empty." + } + Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { try