forkJobProcess implementation consist of several steps. Move each logical consistent step into the generalized function in order to reuse these code fragments in forkPostHooksProcess (will be the next commit).
Signed-off-by: Oleg Ponomarev <[email protected]> --- src/Ganeti/Query/Exec.hs | 184 +++++++++++++++++++++++++++-------------------- 1 file changed, 106 insertions(+), 78 deletions(-) diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs index 79889ff..8a4b13f 100644 --- a/src/Ganeti/Query/Exec.hs +++ b/src/Ganeti/Query/Exec.hs @@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable filterReadable :: (Read a) => [String] -> [a] filterReadable = mapMaybe (fmap fst . listToMaybe . reads) - -- | Catches a potential `IOError` and sets its description via -- `annotateIOError`. This makes exceptions more informative when they -- are thrown from an unnamed `Handle`. @@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a rethrowAnnotateIOError desc = modifyIOError (\e -> annotateIOError e desc Nothing Nothing) --- Code that is executed in a @fork@-ed process and that the replaces iteself --- with the actual job process -runJobProcess :: JobId -> Client -> IO () -runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $ + +-- | Code that is executed in a @fork@-ed process. Performs communication with +-- the parent process by calling commFn and then runs pyExecIO python +-- executable. +runProcess :: JobId -- ^ a job to process + -> Client -- ^ UDS transport + -> IO FilePath -- ^ path to the python executable + -> ((String -> IO ()) -> JobId -> Client -> IO Fd) + -- ^ pre-execution function communicating with the parent. The + -- function returns the file descriptor which should be + -- remain open + -> IO () +runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $ do -- Close the standard error to prevent anything being written there -- (for example by exceptions when closing unneeded FDs). @@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $ let logLater _ = return () logLater $ "Forking a new process for job " ++ show (fromJobId jid) - - -- Create a livelock file for the job - (TOD ts _) <- getClockTime - lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts - - -- Lock the livelock file - logLater $ "Locking livelock file " ++ show lockfile - fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock file" - logLater "Sending the lockfile name to the master process" - sendMsg s lockfile - - logLater "Waiting for the master process to confirm the lock" - _ <- recvMsg s - + preserve_fd <- commFn logLater jid s -- close the client logLater "Closing the client" (clFdR, clFdW) <- clientToFd s @@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $ closeFd clFdR closeFd clFdW - fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds + fds <- (filter (> 2) . filter (/= preserve_fd)) <$> toErrorBase listOpenFds logLater $ "Closing every superfluous file descriptor: " ++ show fds mapM_ (tryIOError . closeFd) fds - -- the master process will send the job id and the livelock file name - -- using the same protocol to the job process - -- we pass the job id as the first argument to the process; - -- while the process never uses it, it's very convenient when listing - -- job processes + -- The master process will send the job id and the livelock file name + -- using the same protocol. We pass the job id as the first argument + -- to the process. While the process never uses it, it's very convenient + -- when listing job processes. use_debug <- isDebugMode env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0") . M.insert "PYTHONPATH" AC.versionedsharedir . M.fromList) `liftM` getEnvironment - execPy <- P.jqueueExecutorPy + execPy <- pyExecIO logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy ++ " with PYTHONPATH=" ++ AC.versionedsharedir () <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)] @@ -224,6 +218,59 @@ forkWithPipe conf childAction = do $ closeClient child return (pid, master) +-- | Kill the process with the id provided. +killProcessOnError :: (FromString e, Show e) + => ProcessID -- ^ job process pid + -> Client -- ^ UDS client connected to the master node + -> (String -> ResultT e (WriterLogT IO) ()) + -- ^ log function + -> ResultT e (WriterLogT IO) () +killProcessOnError pid master logFn = do + logFn "Closing the pipe to the client" + withErrorLogAt WARNING "Closing the communication pipe failed" + (liftIO (closeClient master)) `orElse` return () + killIfAlive [sigTERM, sigABRT, sigKILL] + where killIfAlive [] = return () + killIfAlive (sig : sigs) = do + logFn "Getting the status of the process" + status <- tryError . liftIO $ getProcessStatus False True pid + case status of + Left e -> logFn $ "Job process already gone: " ++ show e + Right (Just s) -> logFn $ "Child process status: " ++ show s + Right Nothing -> do + logFn $ "Child process running, killing by " ++ show sig + liftIO $ signalProcess sig pid + unless (null sigs) $ do + threadDelay 100000 -- wait for 0.1s and check again + killIfAlive sigs + +-- | Forks current process and running runFn in the child and commFn in the +-- parent. Due to a bug in GHC forking process, we want to retry if the forked +-- process fails to start. If it fails later on, the failure is handled by +-- 'ResultT' and no retry is performed. +forkProcessCatchErrors :: (Show e, FromString e) + => (Client -> IO ()) + -> (ProcessID -> String -> ResultT e (WriterLogT IO) ()) + -> (ProcessID -> Client + -> ResultT e (WriterLogT IO) (FilePath, ProcessID)) + -> ResultT e IO (FilePath, ProcessID) +forkProcessCatchErrors runFn logFn commFn = do + -- Due to a bug in GHC forking process, we want to retry + -- if the forked process fails to start. + -- If it fails later on, the failure is handled by 'ResultT' + -- and no retry is performed. + let execWriterLogInside = ResultT . execWriterLogT . runResultT + retryErrorN C.luxidRetryForkCount + $ \tryNo -> execWriterLogInside $ do + let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS + when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS) + + (pid, master) <- liftIO $ forkWithPipe connectConfig runFn + + logFn pid "Forked a new process" + flip catchError (\e -> killProcessOnError pid master (logFn pid) + >> throwError e) $ commFn pid master + -- | Forks the job process and starts processing of the given job. -- Returns the livelock of the job and its process ID. forkJobProcess :: (FromString e, Show e) @@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e) -- and process id in the job file -> ResultT e IO (FilePath, ProcessID) forkJobProcess job luxiLivelock update = do - let jidStr = show . fromJobId . qjId $ job - - -- Retrieve secret parameters if present - let secretParams = encodeStrict . filterSecretParameters . qjOps $ job logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock ++ " for job " ++ jidStr update luxiLivelock - -- Due to a bug in GHC forking process, we want to retry, - -- if the forked process fails to start. - -- If it fails later on, the failure is handled by 'ResultT' - -- and no retry is performed. - let execWriterLogInside = ResultT . execWriterLogT . runResultT - retryErrorN C.luxidRetryForkCount - $ \tryNo -> execWriterLogInside $ do - let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS - when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS) - - (pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess - . qjId $ job) - - let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] " - logDebugJob = logDebug . (jobLogPrefix ++) - - logDebugJob "Forked a new process" - - let killIfAlive [] = return () - killIfAlive (sig : sigs) = do - logDebugJob "Getting the status of the process" - status <- tryError . liftIO $ getProcessStatus False True pid - case status of - Left e -> logDebugJob $ "Job process already gone: " ++ show e - Right (Just s) -> logDebugJob $ "Child process status: " ++ show s - Right Nothing -> do - logDebugJob $ "Child process running, killing by " ++ show sig - liftIO $ signalProcess sig pid - unless (null sigs) $ do - threadDelay 100000 -- wait for 0.1s and check again - killIfAlive sigs - - let onError = do - logDebugJob "Closing the pipe to the client" - withErrorLogAt WARNING "Closing the communication pipe failed" - (liftIO (closeClient master)) `orElse` return () - killIfAlive [sigTERM, sigABRT, sigKILL] - - flip catchError (\e -> onError >> throwError e) - $ do + forkProcessCatchErrors (childMain . qjId $ job) logDebugJob + parentMain + where + -- Retrieve secret parameters if present + secretParams = encodeStrict . filterSecretParameters . qjOps $ job + jidStr = show . fromJobId . qjId $ job + jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] " + logDebugJob pid = logDebug . (jobLogPrefix pid ++) + + -- | Code performing communication with the child process. First, receive + -- the livelock, then send necessary parameters to the python child. + parentMain pid master = do let annotatedIO msg k = do - logDebugJob msg - liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k + logDebugJob pid msg + liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k let recv msg = annotatedIO msg (recvMsg master) send msg x = annotatedIO msg (sendMsg master x) lockfile <- recv "Getting the lockfile of the client" - logDebugJob $ "Setting the lockfile to the final " ++ lockfile + logDebugJob pid ("Setting the lockfile to the final " ++ lockfile) toErrorBase $ update lockfile send "Confirming the client it can start" "" -- from now on, we communicate with the job's Python process - _ <- recv "Waiting for the job to ask for the job id" send "Writing job id to the client" jidStr - _ <- recv "Waiting for the job to ask for the lock file name" send "Writing the lock file name to the client" lockfile - _ <- recv "Waiting for the job to ask for secret parameters" send "Writing secret parameters to the client" secretParams - liftIO $ closeClient master - return (lockfile, pid) + + -- | Code performing communication with the parent process. During + -- communication the livelock is created, locked and sent back + -- to the parent. + childMain jid s = runProcess jid s P.jqueueExecutorPy commFn + where + commFn logFn jid' s' = do + -- Create a livelock file for the job + (TOD ts _) <- getClockTime + lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid') ts + -- Lock the livelock file + _ <- logFn $ "Locking livelock file " ++ show lockfile + fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock" + _ <- logFn "Sending the lockfile name to the master process" + sendMsg s' lockfile + _ <- logFn "Waiting for the master process to confirm the lock" + _ <- recvMsg s' + return fd -- 2.6.0.rc2.230.g3dd15c0
