On Fri, Nov 29, 2013 at 8:44 PM, Klaus Aehlig <[email protected]> wrote:
> Make luxid use the job scheduler instead of immediately > starting every received job. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > src/Ganeti/Query/Server.hs | 79 > ++++++++++++++++++++++++---------------------- > 1 file changed, 42 insertions(+), 37 deletions(-) > > diff --git a/src/Ganeti/Query/Server.hs b/src/Ganeti/Query/Server.hs > index 4a3c5ac..9c2239e 100644 > --- a/src/Ganeti/Query/Server.hs > +++ b/src/Ganeti/Query/Server.hs > @@ -53,6 +53,7 @@ import qualified Ganeti.Config as Config > import Ganeti.ConfigReader > import Ganeti.BasicTypes > import Ganeti.JQueue > +import Ganeti.JQScheduler > import Ganeti.Logging > import Ganeti.Luxi > import qualified Ganeti.Query.Language as Qlang > @@ -80,17 +81,18 @@ handleClassicQuery cfg qkind names fields _ = do > return $ showJSON <$> (qr >>= queryCompat) > > -- | Minimal wrapper to handle the missing config case. > -handleCallWrapper :: MVar () -> Result ConfigData > +handleCallWrapper :: MVar () -> JQStatus -> Result ConfigData > -> LuxiOp -> IO (ErrorResult JSValue) > -handleCallWrapper _ (Bad msg) _ = > +handleCallWrapper _ _ (Bad msg) _ = > return . Bad . ConfigurationError $ > "I do not have access to a valid configuration, cannot\ > \ process queries: " ++ msg > -handleCallWrapper qlock (Ok config) op = handleCall qlock config op > +handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat > config op > > -- | Actual luxi operation handler. > -handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue) > -handleCall _ cdata QueryClusterInfo = > +handleCall :: MVar () -> JQStatus > + -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue) > +handleCall _ _ cdata QueryClusterInfo = > let cluster = configCluster cdata > master = QCluster.clusterMasterNodeName cdata > hypervisors = clusterEnabledHypervisors cluster > @@ -161,7 +163,7 @@ handleCall _ cdata QueryClusterInfo = > Ok _ -> return . Ok . J.makeObj $ obj > Bad ex -> return $ Bad ex > > -handleCall _ cfg (QueryTags kind name) = do > +handleCall _ _ cfg (QueryTags kind name) = do > let tags = case kind of > TagKindCluster -> Ok . clusterTags $ configCluster cfg > TagKindGroup -> groupTags <$> Config.getGroup cfg > name > @@ -172,35 +174,35 @@ handleCall _ cfg (QueryTags kind name) = do > ECodeInval > return (J.showJSON <$> tags) > > -handleCall _ cfg (Query qkind qfields qfilter) = do > +handleCall _ _ cfg (Query qkind qfields qfilter) = do > result <- query cfg True (Qlang.Query qkind qfields qfilter) > return $ J.showJSON <$> result > > -handleCall _ _ (QueryFields qkind qfields) = do > +handleCall _ _ _ (QueryFields qkind qfields) = do > let result = queryFields (Qlang.QueryFields qkind qfields) > return $ J.showJSON <$> result > > -handleCall _ cfg (QueryNodes names fields lock) = > +handleCall _ _ cfg (QueryNodes names fields lock) = > handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode) > (map Left names) fields lock > > -handleCall _ cfg (QueryInstances names fields lock) = > +handleCall _ _ cfg (QueryInstances names fields lock) = > handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance) > (map Left names) fields lock > > -handleCall _ cfg (QueryGroups names fields lock) = > +handleCall _ _ cfg (QueryGroups names fields lock) = > handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup) > (map Left names) fields lock > > -handleCall _ cfg (QueryJobs names fields) = > +handleCall _ _ cfg (QueryJobs names fields) = > handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob) > (map (Right . fromIntegral . fromJobId) names) fields False > > -handleCall _ cfg (QueryNetworks names fields lock) = > +handleCall _ _ cfg (QueryNetworks names fields lock) = > handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork) > (map Left names) fields lock > > -handleCall qlock cfg (SubmitJobToDrainedQueue ops) = > +handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = > do > let mcs = Config.getMasterCandidates cfg > jobid <- allocateJobId mcs qlock > @@ -216,17 +218,17 @@ handleCall qlock cfg (SubmitJobToDrainedQueue ops) = > Bad s -> return . Bad . GenericError $ s > Ok () -> do > _ <- replicateManyJobs qDir mcs [job] > - _ <- forkIO $ startJobs [job] > + _ <- forkIO $ enqueueNewJobs qstat[job] > Just a missing space in the line above. > return . Ok . showJSON . fromJobId $ jid > > -handleCall qlock cfg (SubmitJob ops) = > +handleCall qlock qstat cfg (SubmitJob ops) = > do > open <- isQueueOpen > if not open > then return . Bad . GenericError $ "Queue drained" > - else handleCall qlock cfg (SubmitJobToDrainedQueue ops) > + else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) > > -handleCall qlock cfg (SubmitManyJobs lops) = > +handleCall qlock qstat cfg (SubmitManyJobs lops) = > do > open <- isQueueOpen > if not open > @@ -247,7 +249,7 @@ handleCall qlock cfg (SubmitManyJobs lops) = > when (any isBad write_results) . logWarning > $ "Writing some jobs failed " ++ show annotated_results > replicateManyJobs qDir mcs succeeded > - _ <- forkIO $ startJobs succeeded > + _ <- forkIO $ enqueueNewJobs qstat succeeded > return . Ok . JSArray > . map (\(res, job) -> > if isOk res > @@ -255,7 +257,7 @@ handleCall qlock cfg (SubmitManyJobs lops) = > else showJSON (False, genericResult id (const "") > res)) > $ annotated_results > > -handleCall _ _ op = > +handleCall _ _ _ op = > return . Bad $ > GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented") > > @@ -263,11 +265,12 @@ handleCall _ _ op = > > -- | Given a decoded luxi request, executes it and sends the luxi > -- response back to the client. > -handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool > -handleClientMsg qlock client creader args = do > +handleClientMsg :: MVar () -> JQStatus -> Client -> ConfigReader > + -> LuxiOp -> IO Bool > +handleClientMsg qlock qstat client creader args = do > cfg <- creader > logDebug $ "Request: " ++ show args > - call_result <- handleCallWrapper qlock cfg args > + call_result <- handleCallWrapper qlock qstat cfg args > (!status, !rval) <- > case call_result of > Bad err -> do > @@ -284,8 +287,8 @@ handleClientMsg qlock client creader args = do > > -- | Handles one iteration of the client protocol: receives message, > -- checks it for validity and decodes it, returns response. > -handleClient :: MVar () -> Client -> ConfigReader -> IO Bool > -handleClient qlock client creader = do > +handleClient :: MVar () -> JQStatus -> Client -> ConfigReader -> IO Bool > +handleClient qlock qstat client creader = do > !msg <- recvMsgExt client > logDebug $ "Received message: " ++ show msg > case msg of > @@ -299,27 +302,27 @@ handleClient qlock client creader = do > logWarning errmsg > sendMsg client $ buildResponse False (showJSON errmsg) > return False > - Ok args -> handleClientMsg qlock client creader args > + Ok args -> handleClientMsg qlock qstat client creader args > > -- | Main client loop: runs one loop of 'handleClient', and if that > -- doesn't report a finished (closed) connection, restarts itself. > -clientLoop :: MVar () -> Client -> ConfigReader -> IO () > -clientLoop qlock client creader = do > - result <- handleClient qlock client creader > +clientLoop :: MVar () -> JQStatus -> Client -> ConfigReader -> IO () > +clientLoop qlock qstat client creader = do > + result <- handleClient qlock qstat client creader > if result > - then clientLoop qlock client creader > + then clientLoop qlock qstat client creader > else closeClient client > > -- | Main listener loop: accepts clients, forks an I/O thread to handle > -- that client. > -listener :: MVar () -> ConfigReader -> S.Socket -> IO () > -listener qlock creader socket = do > +listener :: MVar () -> JQStatus -> ConfigReader -> S.Socket -> IO () > +listener qlock qstat creader socket = do > client <- acceptClient socket > - _ <- forkIO $ clientLoop qlock client creader > + _ <- forkIO $ clientLoop qlock qstat client creader > return () > > -- | Type alias for prepMain results > -type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData)) > +type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData), > JQStatus) > > -- | Check function for luxid. > checkMain :: CheckFn () > @@ -333,18 +336,20 @@ prepMain _ _ = do > s <- describeError "binding to the Luxi socket" > Nothing (Just socket_path) $ getServer True socket_path > cref <- newIORef (Bad "Configuration not yet loaded") > - return (socket_path, s, cref) > + jq <- emptyJQStatus > + return (socket_path, s, cref, jq) > > -- | Main function. > main :: MainFn () PrepResult > -main _ _ (socket_path, server, cref) = do > +main _ _ (socket_path, server, cref, jq) = do > initConfigReader id cref > let creader = readIORef cref > + initJQScheduler jq > > qlockFile <- jobQueueLockFile > lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock" > qlock <- newMVar () > > finally > - (forever $ listener qlock creader server) > + (forever $ listener qlock jq creader server) > (closeServer socket_path server) > -- > 1.8.4.1 > > Otherwise LGTM, Thanks.
