On Fri, Nov 29, 2013 at 8:44 PM, Klaus Aehlig <[email protected]> wrote:

> Make luxid use the job scheduler instead of immediately
> starting every received job.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  src/Ganeti/Query/Server.hs | 79
> ++++++++++++++++++++++++----------------------
>  1 file changed, 42 insertions(+), 37 deletions(-)
>
> diff --git a/src/Ganeti/Query/Server.hs b/src/Ganeti/Query/Server.hs
> index 4a3c5ac..9c2239e 100644
> --- a/src/Ganeti/Query/Server.hs
> +++ b/src/Ganeti/Query/Server.hs
> @@ -53,6 +53,7 @@ import qualified Ganeti.Config as Config
>  import Ganeti.ConfigReader
>  import Ganeti.BasicTypes
>  import Ganeti.JQueue
> +import Ganeti.JQScheduler
>  import Ganeti.Logging
>  import Ganeti.Luxi
>  import qualified Ganeti.Query.Language as Qlang
> @@ -80,17 +81,18 @@ handleClassicQuery cfg qkind names fields _ = do
>    return $ showJSON <$> (qr >>= queryCompat)
>
>  -- | Minimal wrapper to handle the missing config case.
> -handleCallWrapper :: MVar () -> Result ConfigData
> +handleCallWrapper :: MVar () -> JQStatus ->  Result ConfigData
>                       -> LuxiOp -> IO (ErrorResult JSValue)
> -handleCallWrapper _ (Bad msg) _ =
> +handleCallWrapper _ _ (Bad msg) _ =
>    return . Bad . ConfigurationError $
>             "I do not have access to a valid configuration, cannot\
>             \ process queries: " ++ msg
> -handleCallWrapper qlock (Ok config) op = handleCall qlock config op
> +handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat
> config op
>
>  -- | Actual luxi operation handler.
> -handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
> -handleCall _ cdata QueryClusterInfo =
> +handleCall :: MVar () -> JQStatus
> +              -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
> +handleCall _ _ cdata QueryClusterInfo =
>    let cluster = configCluster cdata
>        master = QCluster.clusterMasterNodeName cdata
>        hypervisors = clusterEnabledHypervisors cluster
> @@ -161,7 +163,7 @@ handleCall _ cdata QueryClusterInfo =
>      Ok _ -> return . Ok . J.makeObj $ obj
>      Bad ex -> return $ Bad ex
>
> -handleCall _ cfg (QueryTags kind name) = do
> +handleCall _ _ cfg (QueryTags kind name) = do
>    let tags = case kind of
>                 TagKindCluster  -> Ok . clusterTags $ configCluster cfg
>                 TagKindGroup    -> groupTags <$> Config.getGroup    cfg
> name
> @@ -172,35 +174,35 @@ handleCall _ cfg (QueryTags kind name) = do
>                                          ECodeInval
>    return (J.showJSON <$> tags)
>
> -handleCall _ cfg (Query qkind qfields qfilter) = do
> +handleCall _ _ cfg (Query qkind qfields qfilter) = do
>    result <- query cfg True (Qlang.Query qkind qfields qfilter)
>    return $ J.showJSON <$> result
>
> -handleCall _ _ (QueryFields qkind qfields) = do
> +handleCall _ _ _ (QueryFields qkind qfields) = do
>    let result = queryFields (Qlang.QueryFields qkind qfields)
>    return $ J.showJSON <$> result
>
> -handleCall _ cfg (QueryNodes names fields lock) =
> +handleCall _ _ cfg (QueryNodes names fields lock) =
>    handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
>      (map Left names) fields lock
>
> -handleCall _ cfg (QueryInstances names fields lock) =
> +handleCall _ _ cfg (QueryInstances names fields lock) =
>    handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
>      (map Left names) fields lock
>
> -handleCall _ cfg (QueryGroups names fields lock) =
> +handleCall _ _ cfg (QueryGroups names fields lock) =
>    handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
>      (map Left names) fields lock
>
> -handleCall _ cfg (QueryJobs names fields) =
> +handleCall _ _ cfg (QueryJobs names fields) =
>    handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
>      (map (Right . fromIntegral . fromJobId) names)  fields False
>
> -handleCall _ cfg (QueryNetworks names fields lock) =
> +handleCall _ _ cfg (QueryNetworks names fields lock) =
>    handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
>      (map Left names) fields lock
>
> -handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
> +handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) =
>    do
>      let mcs = Config.getMasterCandidates cfg
>      jobid <- allocateJobId mcs qlock
> @@ -216,17 +218,17 @@ handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
>            Bad s -> return . Bad . GenericError $ s
>            Ok () -> do
>              _ <- replicateManyJobs qDir mcs [job]
> -            _ <- forkIO $ startJobs [job]
> +            _ <- forkIO $ enqueueNewJobs qstat[job]
>

Just a missing space in the line above.


>              return . Ok . showJSON . fromJobId $ jid
>
> -handleCall qlock cfg (SubmitJob ops) =
> +handleCall qlock qstat cfg (SubmitJob ops) =
>    do
>      open <- isQueueOpen
>      if not open
>         then return . Bad . GenericError $ "Queue drained"
> -       else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
> +       else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
>
> -handleCall qlock cfg (SubmitManyJobs lops) =
> +handleCall qlock qstat cfg (SubmitManyJobs lops) =
>    do
>      open <- isQueueOpen
>      if not open
> @@ -247,7 +249,7 @@ handleCall qlock cfg (SubmitManyJobs lops) =
>              when (any isBad write_results) . logWarning
>                $ "Writing some jobs failed " ++ show annotated_results
>              replicateManyJobs qDir mcs succeeded
> -            _ <- forkIO $ startJobs succeeded
> +            _ <- forkIO $ enqueueNewJobs qstat succeeded
>              return . Ok . JSArray
>                . map (\(res, job) ->
>                        if isOk res
> @@ -255,7 +257,7 @@ handleCall qlock cfg (SubmitManyJobs lops) =
>                          else showJSON (False, genericResult id (const "")
> res))
>                $ annotated_results
>
> -handleCall _ _ op =
> +handleCall _ _ _ op =
>    return . Bad $
>      GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
>
> @@ -263,11 +265,12 @@ handleCall _ _ op =
>
>  -- | Given a decoded luxi request, executes it and sends the luxi
>  -- response back to the client.
> -handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
> -handleClientMsg qlock client creader args = do
> +handleClientMsg :: MVar () -> JQStatus -> Client -> ConfigReader
> +                   -> LuxiOp -> IO Bool
> +handleClientMsg qlock qstat client creader args = do
>    cfg <- creader
>    logDebug $ "Request: " ++ show args
> -  call_result <- handleCallWrapper qlock cfg args
> +  call_result <- handleCallWrapper qlock qstat cfg args
>    (!status, !rval) <-
>      case call_result of
>        Bad err -> do
> @@ -284,8 +287,8 @@ handleClientMsg qlock client creader args = do
>
>  -- | Handles one iteration of the client protocol: receives message,
>  -- checks it for validity and decodes it, returns response.
> -handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
> -handleClient qlock client creader = do
> +handleClient :: MVar () -> JQStatus -> Client -> ConfigReader -> IO Bool
> +handleClient qlock qstat client creader = do
>    !msg <- recvMsgExt client
>    logDebug $ "Received message: " ++ show msg
>    case msg of
> @@ -299,27 +302,27 @@ handleClient qlock client creader = do
>               logWarning errmsg
>               sendMsg client $ buildResponse False (showJSON errmsg)
>               return False
> -        Ok args -> handleClientMsg qlock client creader args
> +        Ok args -> handleClientMsg qlock qstat client creader args
>
>  -- | Main client loop: runs one loop of 'handleClient', and if that
>  -- doesn't report a finished (closed) connection, restarts itself.
> -clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
> -clientLoop qlock client creader = do
> -  result <- handleClient qlock client creader
> +clientLoop :: MVar () -> JQStatus -> Client -> ConfigReader -> IO ()
> +clientLoop qlock qstat client creader = do
> +  result <- handleClient qlock qstat client creader
>    if result
> -    then clientLoop qlock client creader
> +    then clientLoop qlock qstat client creader
>      else closeClient client
>
>  -- | Main listener loop: accepts clients, forks an I/O thread to handle
>  -- that client.
> -listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
> -listener qlock creader socket = do
> +listener :: MVar () -> JQStatus -> ConfigReader -> S.Socket -> IO ()
> +listener qlock qstat creader socket = do
>    client <- acceptClient socket
> -  _ <- forkIO $ clientLoop qlock client creader
> +  _ <- forkIO $ clientLoop qlock qstat client creader
>    return ()
>
>  -- | Type alias for prepMain results
> -type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData))
> +type PrepResult = (FilePath, S.Socket, IORef (Result ConfigData),
> JQStatus)
>
>  -- | Check function for luxid.
>  checkMain :: CheckFn ()
> @@ -333,18 +336,20 @@ prepMain _ _ = do
>    s <- describeError "binding to the Luxi socket"
>           Nothing (Just socket_path) $ getServer True socket_path
>    cref <- newIORef (Bad "Configuration not yet loaded")
> -  return (socket_path, s, cref)
> +  jq <- emptyJQStatus
> +  return (socket_path, s, cref, jq)
>
>  -- | Main function.
>  main :: MainFn () PrepResult
> -main _ _ (socket_path, server, cref) = do
> +main _ _ (socket_path, server, cref, jq) = do
>    initConfigReader id cref
>    let creader = readIORef cref
> +  initJQScheduler jq
>
>    qlockFile <- jobQueueLockFile
>    lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
>    qlock <- newMVar ()
>
>    finally
> -    (forever $ listener qlock creader server)
> +    (forever $ listener qlock jq creader server)
>      (closeServer socket_path server)
> --
> 1.8.4.1
>
>

Otherwise LGTM, Thanks.

Reply via email to