On Thu, Jul 23, 2015 at 07:34:07PM +0200, 'Klaus Aehlig' via ganeti-devel wrote:
Support auto-balancing in maintd. This is done by implementing
the job handling, but importing the actual algorithm from the
htools module.

Signed-off-by: Klaus Aehlig <[email protected]>
---
Makefile.am                  |   1 +
src/Ganeti/MaintD/Balance.hs | 250 +++++++++++++++++++++++++++++++++++++++++++
src/Ganeti/MaintD/Server.hs  |  13 ++-
3 files changed, 262 insertions(+), 2 deletions(-)
create mode 100644 src/Ganeti/MaintD/Balance.hs

diff --git a/Makefile.am b/Makefile.am
index eee2035..3b23f58 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -975,6 +975,7 @@ HS_LIB_SRCS = \
        src/Ganeti/Logging/WriterLog.hs \
        src/Ganeti/Luxi.hs \
        src/Ganeti/MaintD/Autorepairs.hs \
+       src/Ganeti/MaintD/Balance.hs \
        src/Ganeti/MaintD/MemoryState.hs \
        src/Ganeti/MaintD/Server.hs \
        src/Ganeti/MaintD/Utils.hs \
diff --git a/src/Ganeti/MaintD/Balance.hs b/src/Ganeti/MaintD/Balance.hs
new file mode 100644
index 0000000..866518d
--- /dev/null
+++ b/src/Ganeti/MaintD/Balance.hs
@@ -0,0 +1,250 @@
+{-| Balancing task of the maintenance daemon.
+
+This module carries out the automated balancing done by the
+maintenance daemon. The actual balancing algorithm is imported
+from htools.
+
+-}
+{-
+
+Copyright (C) 2015 Google Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+1. Redistributions of source code must retain the above copyright notice,
+this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+-}
+
+module Ganeti.MaintD.Balance
+  ( balanceTask
+  ) where
+
+import Control.Exception.Lifted (bracket)
+import Control.Monad (liftM)
+import Control.Monad.IO.Class (liftIO)
+import qualified Data.Set as Set
+import qualified Data.Map as Map
+import qualified Data.Traversable as Traversable
+import System.IO.Error (tryIOError)
+import Text.Printf (printf)
+
+import Ganeti.BasicTypes ( ResultT, mkResultT, mkResultT'
+                         , GenericResult(..), Result)
+import Ganeti.Cpu.Types (emptyCPUavgload, CPUavgload(..))
+import Ganeti.HTools.AlgorithmParams (AlgorithmOptions(..), defaultOptions)
+import qualified Ganeti.HTools.Backend.MonD as MonD
+import qualified Ganeti.HTools.Cluster as Cluster
+import qualified Ganeti.HTools.Cluster.Metrics as Metrics
+import qualified Ganeti.HTools.Cluster.Utils as ClusterUtils
+import qualified Ganeti.HTools.Container as Container
+import qualified Ganeti.HTools.Instance as Instance
+import qualified Ganeti.HTools.Node as Node
+import Ganeti.JQueue (currentTimestamp)
+import Ganeti.JQueue.Objects (Timestamp)
+import Ganeti.Jobs (submitJobs)
+import Ganeti.HTools.Types ( zeroUtil, DynUtil(cpuWeight), addUtil, subUtil
+                           , MoveJob)
+import Ganeti.Logging.Lifted (logDebug)
+import Ganeti.MaintD.Utils (annotateOpCode)
+import qualified Ganeti.Luxi as L
+import Ganeti.OpCodes (MetaOpCode)
+import qualified Ganeti.Path as Path
+import qualified Ganeti.Query.Language as Qlang
+import Ganeti.Types (JobId)
+import Ganeti.Utils (logAndBad)
+
+-- * Collection of dynamic load data
+
+data AllReports = AllReports { rTotal :: MonD.Report
+                             , rIndividual :: MonD.Report
+                             }
+
+-- | Empty report. It describes an indle node and can be used as

s/indle/idle

+-- default value for nodes marked as offline.
+emptyReports :: AllReports
+emptyReports = AllReports (MonD.CPUavgloadReport emptyCPUavgload)
+                          (MonD.InstanceCpuReport Map.empty)
+
+-- | Query a node unless it is offline and return all
+-- CPU reports. For offline nodes return the empty report.
+queryNode :: Node.Node -> ResultT String IO AllReports
+queryNode node = do
+  let getReport dc = mkResultT
+                     . liftM (maybe (Bad $ "Failed collecting "
+                                           ++ MonD.dName dc
+                                           ++ " from " ++ Node.name node) Ok
+                              . MonD.mkReport dc)
+                     $ MonD.fromCurl dc node
+  if Node.offline node
+    then return emptyReports
+    else do
+      total <- getReport MonD.totalCPUCollector
+      xeninstances <- getReport MonD.xenCPUCollector
+      return $ AllReports total xeninstances
+
+-- | Get a map with the CPU live data for all nodes; for offline nodes
+-- the empty report is guessed.
+queryLoad :: Node.List -> ResultT String IO (Container.Container AllReports)
+queryLoad = Traversable.mapM queryNode
+
+-- | Ask luxid about the hypervisors used. As, at the moment, we only
+-- have specialised CPU collectors for xen, we're only interested which
+-- instances run under the Xen hypervisor.
+getXenInstances :: ResultT String IO (Set.Set String)
+getXenInstances = do
+  let query = L.Query (Qlang.ItemTypeOpCode Qlang.QRInstance)
+              ["name", "hypervisor"] Qlang.EmptyFilter
+  luxiSocket <- liftIO Path.defaultQuerySocket
+  raw <- bracket (mkResultT . liftM (either (Bad . show) Ok)
+                   . tryIOError $ L.getLuxiClient luxiSocket)
+                 (liftIO . L.closeClient)
+                 $ mkResultT' . L.callMethod query
+  answer <- L.extractArray raw >>= mapM (mapM L.fromJValWithStatus)
+  let getXen [name, hv] | hv `elem` ["xen-pvm", "xen-hvm"] = [name]
+      getXen _ = []
+  return $ Set.fromList (answer >>= getXen)
+
+-- | Update the CPU load of one instance based on the reports.
+-- Fail if instance CPU load is not (yet) available.
+updateCPUInstance :: Node.List
+                  -> Container.Container AllReports
+                  -> Set.Set String
+                  -> Instance.Instance
+                  -> Result Instance.Instance
+updateCPUInstance nl reports xeninsts inst =
+  let name = Instance.name inst
+      nidx = Instance.pNode inst
+  in if name `Set.member` xeninsts
+    then let rep = rIndividual $ Container.find nidx reports
+         in case rep of MonD.InstanceCpuReport m | Map.member name m ->
+                          return $ inst { Instance.util = zeroUtil {
+                                             cpuWeight = m Map.! name } }
+                        _ | Node.offline $ Container.find nidx nl ->
+                          return $ inst { Instance.util = zeroUtil }
+                        _ -> fail $ "Xen CPU data unavailable for " ++ name
+    else let rep = rTotal $ Container.find nidx reports
+         in case rep of MonD.CPUavgloadReport (CPUavgload _ _ ndload) ->
+                          let w = ndload * fromIntegral (Instance.vcpus inst)
+                                  / (fromIntegral . Node.uCpu
+                                       $ Container.find nidx nl)
+                          in return $ inst { Instance.util =
+                                                zeroUtil { cpuWeight = w }}
+                        _ -> fail $ "CPU data unavailable for node of " ++ name
+
+-- | Update CPU usage data based on the collected reports. That is, get the
+-- CPU usage of all instances from the reports and also update the nodes
+-- accordingly.
+updateCPULoad :: (Node.List, Instance.List)
+              -> Container.Container AllReports
+              -> Set.Set String
+              -> Result (Node.List, Instance.List)
+updateCPULoad (nl, il) reports xeninsts = do
+  il' <- Traversable.mapM (updateCPUInstance nl reports xeninsts) il
+  let addNodeUtil n delta = n { Node.utilLoad = addUtil (Node.utilLoad n) delta
+                              , Node.utilLoadForth =
+                                  addUtil (Node.utilLoadForth n) delta
+                              }
+  let updateNodeUtil nnl inst_old inst_new =
+        let delta = subUtil (Instance.util inst_new) $ Instance.util inst_old
+            nidx = Instance.pNode inst_old
+            n = Container.find nidx nnl
+            n' = addNodeUtil n delta
+        in Container.add nidx n' nnl
+  let nl' = foldl (\nnl i -> updateNodeUtil nnl (Container.find i il)
+                               $ Container.find i il') nl $ Container.keys il
+  return (nl', il')
+
+-- * Balancing
+
+-- | Transform an instance move into a submittable job.
+moveToJob :: Timestamp -> (Node.List, Instance.List) -> MoveJob -> [MetaOpCode]
+moveToJob now (nl, il) (_, idx, move, _) =
+  let opCodes = Cluster.iMoveToJob nl il idx move
+  in map (annotateOpCode "auto-balancing the cluster" now) opCodes
+
+-- | Iteratively improve a cluster by iterating over tryBalance.
+iterateBalance :: AlgorithmOptions
+               -> Cluster.Table -- ^ the starting table
+               -> [MoveJob]     -- ^ current command list
+               -> [MoveJob]     -- ^ resulting commands
+iterateBalance opts ini_tbl cmds =
+  let Cluster.Table ini_nl ini_il _ _ = ini_tbl
+      m_next_tbl = Cluster.tryBalance opts ini_tbl
+  in case m_next_tbl of
+    Just next_tbl@(Cluster.Table _ _ _ plc@(curplc:_)) ->
+      let (idx, _, _, move, _) = curplc
+          plc_len = length plc
+          (_, cs) = Cluster.printSolutionLine ini_nl ini_il 1 1 curplc plc_len
+          afn = Cluster.involvedNodes ini_il curplc
+          cmds' = (afn, idx, move, cs):cmds
+      in iterateBalance opts next_tbl cmds'
+    _ -> cmds
+
+-- | Balance a single group, restricted to the allowed nodes and
+-- minimal gain.
+balanceGroup :: L.Client
+             -> Set.Set Int
+             -> Double
+             -> (Int,  (Node.List, Instance.List))
+             -> ResultT String IO [JobId]
+balanceGroup client allowedNodes threshold (gidx, (nl, il)) = do
+  logDebug $ printf "Balancing group %d, %d nodes, %d instances." gidx
+               (Container.size nl) (Container.size il)
+  let ini_cv = Metrics.compCV nl
+      ini_tbl = Cluster.Table nl il ini_cv []
+      opts = defaultOptions { algAllowedNodes = Just allowedNodes
+                            , algMinGain = threshold
+                            , algMinGainLimit = 10 * threshold
+                            }
+      cmds = iterateBalance opts ini_tbl []
+      tasks = take 1 $ Cluster.splitJobs cmds
+  logDebug $ "First task group: " ++ show tasks
+  now <- liftIO currentTimestamp
+  let jobs = tasks >>= map (moveToJob now (nl, il))
+  if null jobs
+    then return []
+    else do
+      jids <- liftIO $ submitJobs jobs client
+      case jids of
+        Bad e -> mkResultT . logAndBad
+                   $ "Failure submitting balancing jobs: " ++ e
+        Ok jids' -> return jids'
+
+-- * Interface function
+
+-- | Carry out all the needed balancing, based on live CPU data, only touching
+-- the available nodes. Only carry out balancing steps where the gain is above
+-- the threshold.
+balanceTask :: (Node.List, Instance.List) -- ^ current cluster configuration
+            -> Set.Set Int -- ^ node indices on which actions may be taken
+            -> Double -- ^ threshold for improvement
+            -> ResultT String IO [JobId] -- ^ jobs submitted
+balanceTask (nl, il) okNodes threshold = do
+  logDebug "Collecting dynamic load values"
+  reports <- queryLoad nl
+  xenInstances <- getXenInstances
+  (nl', il') <- mkResultT . return $ updateCPULoad (nl, il) reports 
xenInstances
+  let ngroups = ClusterUtils.splitCluster nl' il'
+  luxiSocket <- liftIO Path.defaultQuerySocket
+  bracket (liftIO $ L.getLuxiClient luxiSocket) (liftIO . L.closeClient) $ \c 
->
+    liftM concat $ mapM (balanceGroup c okNodes threshold) ngroups
diff --git a/src/Ganeti/MaintD/Server.hs b/src/Ganeti/MaintD/Server.hs
index 5a9f118..b220b2d 100644
--- a/src/Ganeti/MaintD/Server.hs
+++ b/src/Ganeti/MaintD/Server.hs
@@ -68,13 +68,15 @@ import Ganeti.Jobs (waitForJobs)
import Ganeti.Logging.Lifted
import qualified Ganeti.Luxi as L
import Ganeti.MaintD.Autorepairs (harepTasks)
+import Ganeti.MaintD.Balance (balanceTask)
import Ganeti.MaintD.MemoryState
import qualified Ganeti.Path as Path
import Ganeti.Runtime (GanetiDaemon(GanetiMaintd))
import Ganeti.Types (JobId(..))
import Ganeti.Utils (threadDelaySeconds)
import Ganeti.Utils.Http (httpConfFromOpts, plainJSON, error404)
-import Ganeti.WConfd.Client (runNewWConfdClient, maintenanceRoundDelay)
+import Ganeti.WConfd.Client ( runNewWConfdClient, maintenanceRoundDelay
+                            , maintenanceBalancing)

-- | Options list and functions.
options :: [OptType]
@@ -137,8 +139,15 @@ maintenance memstate = do
  (nidxs', jobs) <- harepTasks (nl, il) nidxs
  unless (null oldjobs)
   . liftIO $ appendJobs memstate jobs
-  logDebug $ "Unaffected nodes " ++ show (Set.toList nidxs')
+  logDebug $ "Nodes unaffected by harep " ++ show (Set.toList nidxs')
             ++ ", jobs submitted " ++ show (map fromJobId jobs)
+  (bal, thresh) <- withErrorT show $ runNewWConfdClient maintenanceBalancing
+  when (bal && not (Set.null nidxs')) $ do
+    logDebug $ "Will balance unaffected nodes, threshold " ++ show thresh
+    jobs' <- balanceTask (nl, il) nidxs thresh
+    logDebug $ "Balancing jobs submitted: " ++ show (map fromJobId jobs')
+    unless (null jobs')
+      . liftIO $ appendJobs memstate jobs'

-- | Expose a part of the memory state
exposeState :: J.JSON a => (MemoryState -> a) -> IORef MemoryState -> Snap ()
--
2.4.3.573.g4eafbef


LGTM

Reply via email to