Repository: ignite Updated Branches: refs/heads/ignite-zk 39edd4b56 -> 0cf778363
IGNITE-4835 Visor CMD: Added ability to start cache rebalance. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0dd16a7e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0dd16a7e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0dd16a7e Branch: refs/heads/ignite-zk Commit: 0dd16a7ebdec3d88841250bc19d24cc1430ad3f3 Parents: 592fb33 Author: Alexey Kuznetsov <[email protected]> Authored: Mon Dec 11 15:03:33 2017 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Mon Dec 11 15:03:33 2017 +0700 ---------------------------------------------------------------------- .../ignite/visor/commands/VisorConsole.scala | 1 + .../commands/cache/VisorCacheCommand.scala | 126 +++++++++-------- .../cache/VisorCacheRebalanceCommand.scala | 134 +++++++++++++++++++ 3 files changed, 203 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0dd16a7e/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala index ce8b313..0a7bcb0 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala @@ -63,6 +63,7 @@ class VisorConsole { org.apache.ignite.visor.commands.alert.VisorAlertCommand org.apache.ignite.visor.commands.cache.VisorCacheClearCommand org.apache.ignite.visor.commands.cache.VisorCacheResetCommand + org.apache.ignite.visor.commands.cache.VisorCacheRebalanceCommand org.apache.ignite.visor.commands.cache.VisorCacheCommand org.apache.ignite.visor.commands.config.VisorConfigurationCommand org.apache.ignite.visor.commands.deploy.VisorDeployCommand http://git-wip-us.apache.org/repos/asf/ignite/blob/0dd16a7e/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala index d67b65c..fec5a96 100755 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala @@ -40,25 +40,27 @@ import scala.language.{implicitConversions, reflectiveCalls} * * ==Help== * {{{ - * +-----------------------------------------------------------------------------------------+ - * | cache | Prints statistics about caches from specified node on the entire grid. | - * | | Output sorting can be specified in arguments. | - * | | | - * | | Output abbreviations: | - * | | # Number of nodes. | - * | | H/h Number of cache hits. | - * | | M/m Number of cache misses. | - * | | R/r Number of cache reads. | - * | | W/w Number of cache writes. | - * +-----------------------------------------------------------------------------------------+ - * | cache -clear | Clears all entries from cache on all nodes. | - * +-----------------------------------------------------------------------------------------+ - * | cache -scan | List all entries in cache with specified name. | - * +-----------------------------------------------------------------------------------------+ - * | cache -stop | Stop cache with specified name. | - * +-----------------------------------------------------------------------------------------+ - * | cache -reset | Reset metrics for cache with specified name. | - * +-----------------------------------------------------------------------------------------+ + * +-------------------------------------------------------------------------------------------+ + * | cache | Prints statistics about caches from specified node on the entire grid. | + * | | Output sorting can be specified in arguments. | + * | | | + * | | Output abbreviations: | + * | | # Number of nodes. | + * | | H/h Number of cache hits. | + * | | M/m Number of cache misses. | + * | | R/r Number of cache reads. | + * | | W/w Number of cache writes. | + * +-------------------------------------------------------------------------------------------+ + * | cache -clear | Clears all entries from cache on all nodes. | + * +-------------------------------------------------------------------------------------------+ + * | cache -scan | List all entries in cache with specified name. | + * +-------------------------------------------------------------------------------------------+ + * | cache -stop | Stop cache with specified name. | + * +-------------------------------------------------------------------------------------------+ + * | cache -reset | Reset metrics for cache with specified name. | + * +-------------------------------------------------------------------------------------------+ + * | cache -rebalance | Re-balance partitions for cache with specified name. | + * +-------------------------------------------------------------------------------------------+ * * }}} * @@ -71,6 +73,7 @@ import scala.language.{implicitConversions, reflectiveCalls} * cache -scan -c=<cache-name> {-id=<node-id>|id8=<node-id8>} {-p=<page size>} {-system} * cache -stop -c=<cache-name> * cache -reset -c=<cache-name> + * cache -rebalance -c=<cache-name> * }}} * * ====Arguments==== @@ -115,6 +118,8 @@ import scala.language.{implicitConversions, reflectiveCalls} * Stop cache with specified name. * -reset * Reset metrics for cache with specified name. + * -rebalance + * Re-balance partitions for cache with specified name. * -p=<page size> * Number of object to fetch from cache at once. * Valid range from 1 to 100. @@ -151,6 +156,8 @@ import scala.language.{implicitConversions, reflectiveCalls} * Stops cache with name 'cache'. * cache -reset -c=cache * Reset metrics for cache with name 'cache'. + * cache -rebalance -c=cache + * Re-balance partitions for cache with name 'cache'. * * }}} */ @@ -212,7 +219,7 @@ class VisorCacheCommand { else if (!isActive) { warn("Can not perform the operation because the cluster is inactive.", "Note, that the cluster is considered inactive by default if Ignite Persistent Store is used to let all the nodes join the cluster.", - "To activate the cluster execute following command: top -active.") + "To activate the cluster execute following command: top -activate.") } else { var argLst = parseArgs(args) @@ -258,9 +265,10 @@ class VisorCacheCommand { // Get cache stats data from all nodes. val aggrData = cacheData(node, cacheName, showSystem) - if (hasArgFlagIn("clear", "scan", "stop", "reset")) { + if (hasArgFlagIn("clear", "scan", "stop", "reset", "rebalance")) { if (cacheName.isEmpty) - askForCache("Select cache from:", node, showSystem && !hasArgFlagIn("clear", "stop", "reset"), aggrData) match { + askForCache("Select cache from:", node, showSystem + && !hasArgFlagIn("clear", "stop", "reset", "rebalance"), aggrData) match { case Some(name) => argLst = argLst ++ Seq("c" -> name) @@ -270,25 +278,34 @@ class VisorCacheCommand { } cacheName.foreach(name => { - if (hasArgFlag("scan", argLst)) - VisorCacheScanCommand().scan(argLst, node) - else { - if (aggrData.nonEmpty && !aggrData.exists(cache => F.eq(cache.getName, name) && cache.isSystem)) { - if (hasArgFlag("clear", argLst)) - VisorCacheClearCommand().clear(argLst, node) - else if (hasArgFlag("stop", argLst)) - VisorCacheStopCommand().stop(argLst, node) - else if (hasArgFlag("reset", argLst)) - VisorCacheResetCommand().reset(argLst, node) - } - else { - if (hasArgFlag("clear", argLst)) - warn("Clearing of system cache is not allowed: " + name) - else if (hasArgFlag("stop", argLst)) - warn("Stopping of system cache is not allowed: " + name) - else if (hasArgFlag("reset", argLst)) - warn("Reset metrics of system cache is not allowed: " + name) - } + aggrData.find(cache => F.eq(cache.getName, name)) match { + case Some(cache) => + if (!cache.isSystem) { + if (hasArgFlag("scan", argLst)) + VisorCacheScanCommand().scan(argLst, node) + else if (hasArgFlag("clear", argLst)) + VisorCacheClearCommand().clear(argLst, node) + else if (hasArgFlag("stop", argLst)) + VisorCacheStopCommand().stop(argLst, node) + else if (hasArgFlag("reset", argLst)) + VisorCacheResetCommand().reset(argLst, node) + else if (hasArgFlag("rebalance", argLst)) + VisorCacheRebalanceCommand().rebalance(argLst, node) + } + else { + if (hasArgFlag("scan", argLst)) + warn("Scan of system cache is not allowed: " + name) + else if (hasArgFlag("clear", argLst)) + warn("Clearing of system cache is not allowed: " + name) + else if (hasArgFlag("stop", argLst)) + warn("Stopping of system cache is not allowed: " + name) + else if (hasArgFlag("reset", argLst)) + warn("Reset metrics of system cache is not allowed: " + name) + else if (hasArgFlag("rebalance", argLst)) + warn("Re-balance partitions of system cache is not allowed: " + name) + } + case None => + warn("Cache with specified name not found: " + name) } }) @@ -709,7 +726,8 @@ object VisorCacheCommand { "cache -clear {-c=<cache-name>} {-id=<node-id>|id8=<node-id8>}", "cache -scan -c=<cache-name> {-id=<node-id>|id8=<node-id8>} {-p=<page size>}", "cache -stop -c=<cache-name>", - "cache -reset -c=<cache-name>" + "cache -reset -c=<cache-name>", + "cache -rebalance -c=<cache-name>" ), args = Seq( "-id8=<node-id>" -> Seq( @@ -729,21 +747,12 @@ object VisorCacheCommand { "Name of the cache.", "Note you can also use '@c0' ... '@cn' variables as shortcut to <cache-name>." ), - "-clear" -> Seq( - "Clears cache." - ), - "-system" -> Seq( - "Enable showing of information about system caches." - ), - "-scan" -> Seq( - "Prints list of all entries from cache." - ), - "-stop" -> Seq( - "Stop cache with specified name." - ), - "-reset" -> Seq( - "Reset metrics of cache with specified name." - ), + "-clear" -> "Clears cache.", + "-system" -> "Enable showing of information about system caches.", + "-scan" -> "Prints list of all entries from cache.", + "-stop" -> "Stop cache with specified name.", + "-reset" -> "Reset metrics of cache with specified name.", + "-rebalance" -> "Re-balance partitions for cache with specified name.", "-s=hi|mi|rd|wr|cn" -> Seq( "Defines sorting type. Sorted by:", " hi Hits.", @@ -800,7 +809,8 @@ object VisorCacheCommand { " with page of 50 items from all nodes with this cache."), "cache -scan -c=cache -id8=12345678" -> "Prints list entries from cache with name 'cache' and node '12345678' ID8.", "cache -stop -c=@c0" -> "Stop cache with name taken from 'c0' memory variable.", - "cache -reset -c=@c0" -> "Reset metrics for cache with name taken from 'c0' memory variable." + "cache -reset -c=@c0" -> "Reset metrics for cache with name taken from 'c0' memory variable.", + "cache -rebalance -c=cache" -> "Re-balance partitions for cache with name 'cache'." ), emptyArgs = cmd.cache, withArgs = cmd.cache http://git-wip-us.apache.org/repos/asf/ignite/blob/0dd16a7e/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala new file mode 100644 index 0000000..b416e6a --- /dev/null +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.visor.commands.cache + +import java.util.{HashSet => JavaSet} + +import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterNode} +import org.apache.ignite.internal.visor.cache.{VisorCacheRebalanceTask, VisorCacheRebalanceTaskArg} +import org.apache.ignite.internal.visor.util.VisorTaskUtils._ +import org.apache.ignite.visor.visor._ + +import scala.language.reflectiveCalls + +/** + * ==Overview== + * Visor 'rebalance' command implementation. + * + * ====Specification==== + * {{{ + * cache -rebalance -c=<cache name> + * }}} + * + * ====Arguments==== + * {{{ + * <cache-name> + * Name of the cache. + * }}} + * + * ====Examples==== + * {{{ + * cache -rebalance -c=@c0 + * Re-balance partitions for cache with name taken from 'c0' memory variable. + * }}} + */ +class VisorCacheRebalanceCommand { + /** + * Prints error message and advise. + * + * @param errMsgs Error messages. + */ + private def scold(errMsgs: Any*) { + assert(errMsgs != null) + + warn(errMsgs: _*) + warn("Type 'help cache' to see how to use this command.") + } + + private def error(e: Exception) { + var cause: Throwable = e + + while (cause.getCause != null) + cause = cause.getCause + + scold(cause.getMessage) + } + + /** + * ===Command=== + * Re-balance partitions for cache with specified name. + * + * ===Examples=== + * <ex>cache -c=cache -rebalance</ex> + * Re-balance partitions for cache with name 'cache'. + * + * @param argLst Command arguments. + */ + def rebalance(argLst: ArgList, node: Option[ClusterNode]) { + val cacheArg = argValue("c", argLst) + + val cacheName = cacheArg match { + case None => null // default cache. + + case Some(s) if s.startsWith("@") => + warn("Can't find cache variable with specified name: " + s, + "Type 'cache' to see available cache variables." + ) + + return + + case Some(name) => name + } + + val grp = try { + groupForDataNode(node, cacheName) + } + catch { + case _: ClusterGroupEmptyException => + scold(messageNodeNotFound(node, cacheName)) + + return + } + + try { + val cacheNames = new JavaSet[String]() + cacheNames.add(cacheName) + + executeRandom(grp, classOf[VisorCacheRebalanceTask], new VisorCacheRebalanceTaskArg(cacheNames)) + + println("Visor successfully re-balance partitions for cache: " + escapeName(cacheName)) + } + catch { + case _: ClusterGroupEmptyException => scold(messageNodeNotFound(node, cacheName)) + case e: Exception => error(e) + } + } +} + +/** + * Companion object that does initialization of the command. + */ +object VisorCacheRebalanceCommand { + /** Singleton command. */ + private val cmd = new VisorCacheRebalanceCommand + + /** + * Singleton. + */ + def apply(): VisorCacheRebalanceCommand = cmd +}
