On Thu, Jul 23, 2015 at 07:34:07PM +0200, 'Klaus Aehlig' via ganeti-devel wrote:
Support auto-balancing in maintd. This is done by implementing
the job handling, but importing the actual algorithm from the
htools module.
Signed-off-by: Klaus Aehlig <[email protected]>
---
Makefile.am | 1 +
src/Ganeti/MaintD/Balance.hs | 250 +++++++++++++++++++++++++++++++++++++++++++
src/Ganeti/MaintD/Server.hs | 13 ++-
3 files changed, 262 insertions(+), 2 deletions(-)
create mode 100644 src/Ganeti/MaintD/Balance.hs
diff --git a/Makefile.am b/Makefile.am
index eee2035..3b23f58 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -975,6 +975,7 @@ HS_LIB_SRCS = \
src/Ganeti/Logging/WriterLog.hs \
src/Ganeti/Luxi.hs \
src/Ganeti/MaintD/Autorepairs.hs \
+ src/Ganeti/MaintD/Balance.hs \
src/Ganeti/MaintD/MemoryState.hs \
src/Ganeti/MaintD/Server.hs \
src/Ganeti/MaintD/Utils.hs \
diff --git a/src/Ganeti/MaintD/Balance.hs b/src/Ganeti/MaintD/Balance.hs
new file mode 100644
index 0000000..866518d
--- /dev/null
+++ b/src/Ganeti/MaintD/Balance.hs
@@ -0,0 +1,250 @@
+{-| Balancing task of the maintenance daemon.
+
+This module carries out the automated balancing done by the
+maintenance daemon. The actual balancing algorithm is imported
+from htools.
+
+-}
+{-
+
+Copyright (C) 2015 Google Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+1. Redistributions of source code must retain the above copyright notice,
+this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+-}
+
+module Ganeti.MaintD.Balance
+ ( balanceTask
+ ) where
+
+import Control.Exception.Lifted (bracket)
+import Control.Monad (liftM)
+import Control.Monad.IO.Class (liftIO)
+import qualified Data.Set as Set
+import qualified Data.Map as Map
+import qualified Data.Traversable as Traversable
+import System.IO.Error (tryIOError)
+import Text.Printf (printf)
+
+import Ganeti.BasicTypes ( ResultT, mkResultT, mkResultT'
+ , GenericResult(..), Result)
+import Ganeti.Cpu.Types (emptyCPUavgload, CPUavgload(..))
+import Ganeti.HTools.AlgorithmParams (AlgorithmOptions(..), defaultOptions)
+import qualified Ganeti.HTools.Backend.MonD as MonD
+import qualified Ganeti.HTools.Cluster as Cluster
+import qualified Ganeti.HTools.Cluster.Metrics as Metrics
+import qualified Ganeti.HTools.Cluster.Utils as ClusterUtils
+import qualified Ganeti.HTools.Container as Container
+import qualified Ganeti.HTools.Instance as Instance
+import qualified Ganeti.HTools.Node as Node
+import Ganeti.JQueue (currentTimestamp)
+import Ganeti.JQueue.Objects (Timestamp)
+import Ganeti.Jobs (submitJobs)
+import Ganeti.HTools.Types ( zeroUtil, DynUtil(cpuWeight), addUtil, subUtil
+ , MoveJob)
+import Ganeti.Logging.Lifted (logDebug)
+import Ganeti.MaintD.Utils (annotateOpCode)
+import qualified Ganeti.Luxi as L
+import Ganeti.OpCodes (MetaOpCode)
+import qualified Ganeti.Path as Path
+import qualified Ganeti.Query.Language as Qlang
+import Ganeti.Types (JobId)
+import Ganeti.Utils (logAndBad)
+
+-- * Collection of dynamic load data
+
+data AllReports = AllReports { rTotal :: MonD.Report
+ , rIndividual :: MonD.Report
+ }
+
+-- | Empty report. It describes an indle node and can be used as
s/indle/idle
+-- default value for nodes marked as offline.
+emptyReports :: AllReports
+emptyReports = AllReports (MonD.CPUavgloadReport emptyCPUavgload)
+ (MonD.InstanceCpuReport Map.empty)
+
+-- | Query a node unless it is offline and return all
+-- CPU reports. For offline nodes return the empty report.
+queryNode :: Node.Node -> ResultT String IO AllReports
+queryNode node = do
+ let getReport dc = mkResultT
+ . liftM (maybe (Bad $ "Failed collecting "
+ ++ MonD.dName dc
+ ++ " from " ++ Node.name node) Ok
+ . MonD.mkReport dc)
+ $ MonD.fromCurl dc node
+ if Node.offline node
+ then return emptyReports
+ else do
+ total <- getReport MonD.totalCPUCollector
+ xeninstances <- getReport MonD.xenCPUCollector
+ return $ AllReports total xeninstances
+
+-- | Get a map with the CPU live data for all nodes; for offline nodes
+-- the empty report is guessed.
+queryLoad :: Node.List -> ResultT String IO (Container.Container AllReports)
+queryLoad = Traversable.mapM queryNode
+
+-- | Ask luxid about the hypervisors used. As, at the moment, we only
+-- have specialised CPU collectors for xen, we're only interested which
+-- instances run under the Xen hypervisor.
+getXenInstances :: ResultT String IO (Set.Set String)
+getXenInstances = do
+ let query = L.Query (Qlang.ItemTypeOpCode Qlang.QRInstance)
+ ["name", "hypervisor"] Qlang.EmptyFilter
+ luxiSocket <- liftIO Path.defaultQuerySocket
+ raw <- bracket (mkResultT . liftM (either (Bad . show) Ok)
+ . tryIOError $ L.getLuxiClient luxiSocket)
+ (liftIO . L.closeClient)
+ $ mkResultT' . L.callMethod query
+ answer <- L.extractArray raw >>= mapM (mapM L.fromJValWithStatus)
+ let getXen [name, hv] | hv `elem` ["xen-pvm", "xen-hvm"] = [name]
+ getXen _ = []
+ return $ Set.fromList (answer >>= getXen)
+
+-- | Update the CPU load of one instance based on the reports.
+-- Fail if instance CPU load is not (yet) available.
+updateCPUInstance :: Node.List
+ -> Container.Container AllReports
+ -> Set.Set String
+ -> Instance.Instance
+ -> Result Instance.Instance
+updateCPUInstance nl reports xeninsts inst =
+ let name = Instance.name inst
+ nidx = Instance.pNode inst
+ in if name `Set.member` xeninsts
+ then let rep = rIndividual $ Container.find nidx reports
+ in case rep of MonD.InstanceCpuReport m | Map.member name m ->
+ return $ inst { Instance.util = zeroUtil {
+ cpuWeight = m Map.! name } }
+ _ | Node.offline $ Container.find nidx nl ->
+ return $ inst { Instance.util = zeroUtil }
+ _ -> fail $ "Xen CPU data unavailable for " ++ name
+ else let rep = rTotal $ Container.find nidx reports
+ in case rep of MonD.CPUavgloadReport (CPUavgload _ _ ndload) ->
+ let w = ndload * fromIntegral (Instance.vcpus inst)
+ / (fromIntegral . Node.uCpu
+ $ Container.find nidx nl)
+ in return $ inst { Instance.util =
+ zeroUtil { cpuWeight = w }}
+ _ -> fail $ "CPU data unavailable for node of " ++ name
+
+-- | Update CPU usage data based on the collected reports. That is, get the
+-- CPU usage of all instances from the reports and also update the nodes
+-- accordingly.
+updateCPULoad :: (Node.List, Instance.List)
+ -> Container.Container AllReports
+ -> Set.Set String
+ -> Result (Node.List, Instance.List)
+updateCPULoad (nl, il) reports xeninsts = do
+ il' <- Traversable.mapM (updateCPUInstance nl reports xeninsts) il
+ let addNodeUtil n delta = n { Node.utilLoad = addUtil (Node.utilLoad n) delta
+ , Node.utilLoadForth =
+ addUtil (Node.utilLoadForth n) delta
+ }
+ let updateNodeUtil nnl inst_old inst_new =
+ let delta = subUtil (Instance.util inst_new) $ Instance.util inst_old
+ nidx = Instance.pNode inst_old
+ n = Container.find nidx nnl
+ n' = addNodeUtil n delta
+ in Container.add nidx n' nnl
+ let nl' = foldl (\nnl i -> updateNodeUtil nnl (Container.find i il)
+ $ Container.find i il') nl $ Container.keys il
+ return (nl', il')
+
+-- * Balancing
+
+-- | Transform an instance move into a submittable job.
+moveToJob :: Timestamp -> (Node.List, Instance.List) -> MoveJob -> [MetaOpCode]
+moveToJob now (nl, il) (_, idx, move, _) =
+ let opCodes = Cluster.iMoveToJob nl il idx move
+ in map (annotateOpCode "auto-balancing the cluster" now) opCodes
+
+-- | Iteratively improve a cluster by iterating over tryBalance.
+iterateBalance :: AlgorithmOptions
+ -> Cluster.Table -- ^ the starting table
+ -> [MoveJob] -- ^ current command list
+ -> [MoveJob] -- ^ resulting commands
+iterateBalance opts ini_tbl cmds =
+ let Cluster.Table ini_nl ini_il _ _ = ini_tbl
+ m_next_tbl = Cluster.tryBalance opts ini_tbl
+ in case m_next_tbl of
+ Just next_tbl@(Cluster.Table _ _ _ plc@(curplc:_)) ->
+ let (idx, _, _, move, _) = curplc
+ plc_len = length plc
+ (_, cs) = Cluster.printSolutionLine ini_nl ini_il 1 1 curplc plc_len
+ afn = Cluster.involvedNodes ini_il curplc
+ cmds' = (afn, idx, move, cs):cmds
+ in iterateBalance opts next_tbl cmds'
+ _ -> cmds
+
+-- | Balance a single group, restricted to the allowed nodes and
+-- minimal gain.
+balanceGroup :: L.Client
+ -> Set.Set Int
+ -> Double
+ -> (Int, (Node.List, Instance.List))
+ -> ResultT String IO [JobId]
+balanceGroup client allowedNodes threshold (gidx, (nl, il)) = do
+ logDebug $ printf "Balancing group %d, %d nodes, %d instances." gidx
+ (Container.size nl) (Container.size il)
+ let ini_cv = Metrics.compCV nl
+ ini_tbl = Cluster.Table nl il ini_cv []
+ opts = defaultOptions { algAllowedNodes = Just allowedNodes
+ , algMinGain = threshold
+ , algMinGainLimit = 10 * threshold
+ }
+ cmds = iterateBalance opts ini_tbl []
+ tasks = take 1 $ Cluster.splitJobs cmds
+ logDebug $ "First task group: " ++ show tasks
+ now <- liftIO currentTimestamp
+ let jobs = tasks >>= map (moveToJob now (nl, il))
+ if null jobs
+ then return []
+ else do
+ jids <- liftIO $ submitJobs jobs client
+ case jids of
+ Bad e -> mkResultT . logAndBad
+ $ "Failure submitting balancing jobs: " ++ e
+ Ok jids' -> return jids'
+
+-- * Interface function
+
+-- | Carry out all the needed balancing, based on live CPU data, only touching
+-- the available nodes. Only carry out balancing steps where the gain is above
+-- the threshold.
+balanceTask :: (Node.List, Instance.List) -- ^ current cluster configuration
+ -> Set.Set Int -- ^ node indices on which actions may be taken
+ -> Double -- ^ threshold for improvement
+ -> ResultT String IO [JobId] -- ^ jobs submitted
+balanceTask (nl, il) okNodes threshold = do
+ logDebug "Collecting dynamic load values"
+ reports <- queryLoad nl
+ xenInstances <- getXenInstances
+ (nl', il') <- mkResultT . return $ updateCPULoad (nl, il) reports
xenInstances
+ let ngroups = ClusterUtils.splitCluster nl' il'
+ luxiSocket <- liftIO Path.defaultQuerySocket
+ bracket (liftIO $ L.getLuxiClient luxiSocket) (liftIO . L.closeClient) $ \c
->
+ liftM concat $ mapM (balanceGroup c okNodes threshold) ngroups
diff --git a/src/Ganeti/MaintD/Server.hs b/src/Ganeti/MaintD/Server.hs
index 5a9f118..b220b2d 100644
--- a/src/Ganeti/MaintD/Server.hs
+++ b/src/Ganeti/MaintD/Server.hs
@@ -68,13 +68,15 @@ import Ganeti.Jobs (waitForJobs)
import Ganeti.Logging.Lifted
import qualified Ganeti.Luxi as L
import Ganeti.MaintD.Autorepairs (harepTasks)
+import Ganeti.MaintD.Balance (balanceTask)
import Ganeti.MaintD.MemoryState
import qualified Ganeti.Path as Path
import Ganeti.Runtime (GanetiDaemon(GanetiMaintd))
import Ganeti.Types (JobId(..))
import Ganeti.Utils (threadDelaySeconds)
import Ganeti.Utils.Http (httpConfFromOpts, plainJSON, error404)
-import Ganeti.WConfd.Client (runNewWConfdClient, maintenanceRoundDelay)
+import Ganeti.WConfd.Client ( runNewWConfdClient, maintenanceRoundDelay
+ , maintenanceBalancing)
-- | Options list and functions.
options :: [OptType]
@@ -137,8 +139,15 @@ maintenance memstate = do
(nidxs', jobs) <- harepTasks (nl, il) nidxs
unless (null oldjobs)
. liftIO $ appendJobs memstate jobs
- logDebug $ "Unaffected nodes " ++ show (Set.toList nidxs')
+ logDebug $ "Nodes unaffected by harep " ++ show (Set.toList nidxs')
++ ", jobs submitted " ++ show (map fromJobId jobs)
+ (bal, thresh) <- withErrorT show $ runNewWConfdClient maintenanceBalancing
+ when (bal && not (Set.null nidxs')) $ do
+ logDebug $ "Will balance unaffected nodes, threshold " ++ show thresh
+ jobs' <- balanceTask (nl, il) nidxs thresh
+ logDebug $ "Balancing jobs submitted: " ++ show (map fromJobId jobs')
+ unless (null jobs')
+ . liftIO $ appendJobs memstate jobs'
-- | Expose a part of the memory state
exposeState :: J.JSON a => (MemoryState -> a) -> IORef MemoryState -> Snap ()
--
2.4.3.573.g4eafbef
LGTM