Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 39edd4b56 -> 0cf778363


IGNITE-4835 Visor CMD: Added ability to start cache rebalance.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0dd16a7e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0dd16a7e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0dd16a7e

Branch: refs/heads/ignite-zk
Commit: 0dd16a7ebdec3d88841250bc19d24cc1430ad3f3
Parents: 592fb33
Author: Alexey Kuznetsov <[email protected]>
Authored: Mon Dec 11 15:03:33 2017 +0700
Committer: Alexey Kuznetsov <[email protected]>
Committed: Mon Dec 11 15:03:33 2017 +0700

----------------------------------------------------------------------
 .../ignite/visor/commands/VisorConsole.scala    |   1 +
 .../commands/cache/VisorCacheCommand.scala      | 126 +++++++++--------
 .../cache/VisorCacheRebalanceCommand.scala      | 134 +++++++++++++++++++
 3 files changed, 203 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0dd16a7e/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
----------------------------------------------------------------------
diff --git 
a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
 
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
index ce8b313..0a7bcb0 100644
--- 
a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
+++ 
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/VisorConsole.scala
@@ -63,6 +63,7 @@ class VisorConsole {
         org.apache.ignite.visor.commands.alert.VisorAlertCommand
         org.apache.ignite.visor.commands.cache.VisorCacheClearCommand
         org.apache.ignite.visor.commands.cache.VisorCacheResetCommand
+        org.apache.ignite.visor.commands.cache.VisorCacheRebalanceCommand
         org.apache.ignite.visor.commands.cache.VisorCacheCommand
         org.apache.ignite.visor.commands.config.VisorConfigurationCommand
         org.apache.ignite.visor.commands.deploy.VisorDeployCommand

http://git-wip-us.apache.org/repos/asf/ignite/blob/0dd16a7e/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
----------------------------------------------------------------------
diff --git 
a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
 
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
index d67b65c..fec5a96 100755
--- 
a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
+++ 
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
@@ -40,25 +40,27 @@ import scala.language.{implicitConversions, reflectiveCalls}
  *
  * ==Help==
  * {{{
- * 
+-----------------------------------------------------------------------------------------+
- * | cache          | Prints statistics about caches from specified node on 
the entire grid. |
- * |                | Output sorting can be specified in arguments.            
              |
- * |                |                                                          
              |
- * |                | Output abbreviations:                                    
              |
- * |                |     #   Number of nodes.                                 
              |
- * |                |     H/h Number of cache hits.                            
              |
- * |                |     M/m Number of cache misses.                          
              |
- * |                |     R/r Number of cache reads.                           
              |
- * |                |     W/w Number of cache writes.                          
              |
- * 
+-----------------------------------------------------------------------------------------+
- * | cache -clear   | Clears all entries from cache on all nodes.              
              |
- * 
+-----------------------------------------------------------------------------------------+
- * | cache -scan    | List all entries in cache with specified name.           
              |
- * 
+-----------------------------------------------------------------------------------------+
- * | cache -stop    | Stop cache with specified name.                          
              |
- * 
+-----------------------------------------------------------------------------------------+
- * | cache -reset   | Reset metrics for cache with specified name.             
              |
- * 
+-----------------------------------------------------------------------------------------+
+ * 
+-------------------------------------------------------------------------------------------+
+ * | cache            | Prints statistics about caches from specified node on 
the entire grid. |
+ * |                  | Output sorting can be specified in arguments.          
                |
+ * |                  |                                                        
                |
+ * |                  | Output abbreviations:                                  
                |
+ * |                  |     #   Number of nodes.                               
                |
+ * |                  |     H/h Number of cache hits.                          
                |
+ * |                  |     M/m Number of cache misses.                        
                |
+ * |                  |     R/r Number of cache reads.                         
                |
+ * |                  |     W/w Number of cache writes.                        
                |
+ * 
+-------------------------------------------------------------------------------------------+
+ * | cache -clear     | Clears all entries from cache on all nodes.            
                |
+ * 
+-------------------------------------------------------------------------------------------+
+ * | cache -scan      | List all entries in cache with specified name.         
                |
+ * 
+-------------------------------------------------------------------------------------------+
+ * | cache -stop      | Stop cache with specified name.                        
                |
+ * 
+-------------------------------------------------------------------------------------------+
+ * | cache -reset     | Reset metrics for cache with specified name.           
                |
+ * 
+-------------------------------------------------------------------------------------------+
+ * | cache -rebalance | Re-balance partitions for cache with specified name.   
                |
+ * 
+-------------------------------------------------------------------------------------------+
  *
  * }}}
  *
@@ -71,6 +73,7 @@ import scala.language.{implicitConversions, reflectiveCalls}
  *     cache -scan -c=<cache-name> {-id=<node-id>|id8=<node-id8>} {-p=<page 
size>} {-system}
  *     cache -stop -c=<cache-name>
  *     cache -reset -c=<cache-name>
+ *     cache -rebalance -c=<cache-name>
  * }}}
  *
  * ====Arguments====
@@ -115,6 +118,8 @@ import scala.language.{implicitConversions, reflectiveCalls}
  *          Stop cache with specified name.
  *     -reset
  *          Reset metrics for cache with specified name.
+ *     -rebalance
+ *          Re-balance partitions for cache with specified name.
  *     -p=<page size>
  *         Number of object to fetch from cache at once.
  *         Valid range from 1 to 100.
@@ -151,6 +156,8 @@ import scala.language.{implicitConversions, reflectiveCalls}
  *         Stops cache with name 'cache'.
  *     cache -reset -c=cache
  *         Reset metrics for cache with name 'cache'.
+ *     cache -rebalance -c=cache
+ *         Re-balance partitions for cache with name 'cache'.
  *
  * }}}
  */
@@ -212,7 +219,7 @@ class VisorCacheCommand {
         else if (!isActive) {
             warn("Can not perform the operation because the cluster is 
inactive.",
                 "Note, that the cluster is considered inactive by default if 
Ignite Persistent Store is used to let all the nodes join the cluster.",
-                "To activate the cluster execute following command: top 
-active.")
+                "To activate the cluster execute following command: top 
-activate.")
         }
         else {
             var argLst = parseArgs(args)
@@ -258,9 +265,10 @@ class VisorCacheCommand {
             // Get cache stats data from all nodes.
             val aggrData = cacheData(node, cacheName, showSystem)
 
-            if (hasArgFlagIn("clear", "scan", "stop", "reset")) {
+            if (hasArgFlagIn("clear", "scan", "stop", "reset", "rebalance")) {
                 if (cacheName.isEmpty)
-                    askForCache("Select cache from:", node, showSystem && 
!hasArgFlagIn("clear", "stop", "reset"), aggrData) match {
+                    askForCache("Select cache from:", node, showSystem
+                        && !hasArgFlagIn("clear", "stop", "reset", 
"rebalance"), aggrData) match {
                         case Some(name) =>
                             argLst = argLst ++ Seq("c" -> name)
 
@@ -270,25 +278,34 @@ class VisorCacheCommand {
                     }
 
                 cacheName.foreach(name => {
-                    if (hasArgFlag("scan", argLst))
-                        VisorCacheScanCommand().scan(argLst, node)
-                    else {
-                        if (aggrData.nonEmpty && !aggrData.exists(cache => 
F.eq(cache.getName, name) && cache.isSystem)) {
-                            if (hasArgFlag("clear", argLst))
-                                VisorCacheClearCommand().clear(argLst, node)
-                            else if (hasArgFlag("stop", argLst))
-                                VisorCacheStopCommand().stop(argLst, node)
-                            else if (hasArgFlag("reset", argLst))
-                              VisorCacheResetCommand().reset(argLst, node)
-                        }
-                        else {
-                            if (hasArgFlag("clear", argLst))
-                                warn("Clearing of system cache is not allowed: 
" + name)
-                            else if (hasArgFlag("stop", argLst))
-                                warn("Stopping of system cache is not allowed: 
" + name)
-                            else if (hasArgFlag("reset", argLst))
-                                warn("Reset metrics of system cache is not 
allowed: " + name)
-                        }
+                    aggrData.find(cache => F.eq(cache.getName, name)) match {
+                        case Some(cache) =>
+                            if (!cache.isSystem) {
+                                if (hasArgFlag("scan", argLst))
+                                    VisorCacheScanCommand().scan(argLst, node)
+                                else if (hasArgFlag("clear", argLst))
+                                    VisorCacheClearCommand().clear(argLst, 
node)
+                                else if (hasArgFlag("stop", argLst))
+                                    VisorCacheStopCommand().stop(argLst, node)
+                                else if (hasArgFlag("reset", argLst))
+                                    VisorCacheResetCommand().reset(argLst, 
node)
+                                else if (hasArgFlag("rebalance", argLst))
+                                    
VisorCacheRebalanceCommand().rebalance(argLst, node)
+                            }
+                            else {
+                                if (hasArgFlag("scan", argLst))
+                                    warn("Scan of system cache is not allowed: 
" + name)
+                                else if (hasArgFlag("clear", argLst))
+                                    warn("Clearing of system cache is not 
allowed: " + name)
+                                else if (hasArgFlag("stop", argLst))
+                                    warn("Stopping of system cache is not 
allowed: " + name)
+                                else if (hasArgFlag("reset", argLst))
+                                    warn("Reset metrics of system cache is not 
allowed: " + name)
+                                else if (hasArgFlag("rebalance", argLst))
+                                    warn("Re-balance partitions of system 
cache is not allowed: " + name)
+                            }
+                        case None =>
+                            warn("Cache with specified name not found: " + 
name)
                     }
                 })
 
@@ -709,7 +726,8 @@ object VisorCacheCommand {
             "cache -clear {-c=<cache-name>} {-id=<node-id>|id8=<node-id8>}",
             "cache -scan -c=<cache-name> {-id=<node-id>|id8=<node-id8>} 
{-p=<page size>}",
             "cache -stop -c=<cache-name>",
-            "cache -reset -c=<cache-name>"
+            "cache -reset -c=<cache-name>",
+            "cache -rebalance -c=<cache-name>"
   ),
         args = Seq(
             "-id8=<node-id>" -> Seq(
@@ -729,21 +747,12 @@ object VisorCacheCommand {
                 "Name of the cache.",
                 "Note you can also use '@c0' ... '@cn' variables as shortcut 
to <cache-name>."
             ),
-            "-clear" -> Seq(
-                "Clears cache."
-            ),
-            "-system" -> Seq(
-                "Enable showing of information about system caches."
-            ),
-            "-scan" -> Seq(
-                "Prints list of all entries from cache."
-            ),
-            "-stop" -> Seq(
-                "Stop cache with specified name."
-            ),
-            "-reset" -> Seq(
-                "Reset metrics of cache with specified name."
-            ),
+            "-clear" -> "Clears cache.",
+            "-system" -> "Enable showing of information about system caches.",
+            "-scan" -> "Prints list of all entries from cache.",
+            "-stop" -> "Stop cache with specified name.",
+            "-reset" -> "Reset metrics of cache with specified name.",
+            "-rebalance" -> "Re-balance partitions for cache with specified 
name.",
             "-s=hi|mi|rd|wr|cn" -> Seq(
                 "Defines sorting type. Sorted by:",
                 "   hi Hits.",
@@ -800,7 +809,8 @@ object VisorCacheCommand {
                 " with page of 50 items from all nodes with this cache."),
             "cache -scan -c=cache -id8=12345678" -> "Prints list entries from 
cache with name 'cache' and node '12345678' ID8.",
             "cache -stop -c=@c0" -> "Stop cache with name taken from 'c0' 
memory variable.",
-            "cache -reset -c=@c0" -> "Reset metrics for cache with name taken 
from 'c0' memory variable."
+            "cache -reset -c=@c0" -> "Reset metrics for cache with name taken 
from 'c0' memory variable.",
+            "cache -rebalance -c=cache" -> "Re-balance partitions for cache 
with name 'cache'."
         ),
         emptyArgs = cmd.cache,
         withArgs = cmd.cache

http://git-wip-us.apache.org/repos/asf/ignite/blob/0dd16a7e/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala
----------------------------------------------------------------------
diff --git 
a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala
 
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala
new file mode 100644
index 0000000..b416e6a
--- /dev/null
+++ 
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheRebalanceCommand.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.visor.commands.cache
+
+import java.util.{HashSet => JavaSet}
+
+import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterNode}
+import org.apache.ignite.internal.visor.cache.{VisorCacheRebalanceTask, 
VisorCacheRebalanceTaskArg}
+import org.apache.ignite.internal.visor.util.VisorTaskUtils._
+import org.apache.ignite.visor.visor._
+
+import scala.language.reflectiveCalls
+
+/**
+  * ==Overview==
+  * Visor 'rebalance' command implementation.
+  *
+  * ====Specification====
+  * {{{
+  *     cache -rebalance -c=<cache name>
+  * }}}
+  *
+  * ====Arguments====
+  * {{{
+  *     <cache-name>
+  *         Name of the cache.
+  * }}}
+  *
+  * ====Examples====
+  * {{{
+  *    cache -rebalance -c=@c0
+  *        Re-balance partitions for cache with name taken from 'c0' memory 
variable.
+  * }}}
+  */
+class VisorCacheRebalanceCommand {
+    /**
+      * Prints error message and advise.
+      *
+      * @param errMsgs Error messages.
+      */
+    private def scold(errMsgs: Any*) {
+        assert(errMsgs != null)
+
+        warn(errMsgs: _*)
+        warn("Type 'help cache' to see how to use this command.")
+    }
+
+    private def error(e: Exception) {
+        var cause: Throwable = e
+
+        while (cause.getCause != null)
+            cause = cause.getCause
+
+        scold(cause.getMessage)
+    }
+
+    /**
+      * ===Command===
+      * Re-balance partitions for cache with specified name.
+      *
+      * ===Examples===
+      * <ex>cache -c=cache -rebalance</ex>
+      * Re-balance partitions for cache with name 'cache'.
+      *
+      * @param argLst Command arguments.
+      */
+    def rebalance(argLst: ArgList, node: Option[ClusterNode]) {
+        val cacheArg = argValue("c", argLst)
+
+        val cacheName = cacheArg match {
+            case None => null // default cache.
+
+            case Some(s) if s.startsWith("@") =>
+                warn("Can't find cache variable with specified name: " + s,
+                    "Type 'cache' to see available cache variables."
+                )
+
+                return
+
+            case Some(name) => name
+        }
+
+        val grp = try {
+            groupForDataNode(node, cacheName)
+        }
+        catch {
+            case _: ClusterGroupEmptyException =>
+                scold(messageNodeNotFound(node, cacheName))
+
+                return
+        }
+
+        try {
+            val cacheNames = new JavaSet[String]()
+            cacheNames.add(cacheName)
+
+            executeRandom(grp, classOf[VisorCacheRebalanceTask], new 
VisorCacheRebalanceTaskArg(cacheNames))
+
+            println("Visor successfully re-balance partitions for cache: " + 
escapeName(cacheName))
+        }
+        catch {
+            case _: ClusterGroupEmptyException => 
scold(messageNodeNotFound(node, cacheName))
+            case e: Exception => error(e)
+        }
+    }
+}
+
+/**
+  * Companion object that does initialization of the command.
+  */
+object VisorCacheRebalanceCommand {
+    /** Singleton command. */
+    private val cmd = new VisorCacheRebalanceCommand
+
+    /**
+      * Singleton.
+      */
+    def apply(): VisorCacheRebalanceCommand = cmd
+}

Reply via email to