forkJobProcess implementation consist of several steps. Move each
logical consistent step into the generalized function in order to reuse
these code fragments in forkPostHooksProcess (will be the next commit).

Signed-off-by: Oleg Ponomarev <[email protected]>
---
 src/Ganeti/Query/Exec.hs | 184 +++++++++++++++++++++++++++--------------------
 1 file changed, 106 insertions(+), 78 deletions(-)

diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs
index 79889ff..8a4b13f 100644
--- a/src/Ganeti/Query/Exec.hs
+++ b/src/Ganeti/Query/Exec.hs
@@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable
     filterReadable :: (Read a) => [String] -> [a]
     filterReadable = mapMaybe (fmap fst . listToMaybe . reads)
 
-
 -- | Catches a potential `IOError` and sets its description via
 -- `annotateIOError`. This makes exceptions more informative when they
 -- are thrown from an unnamed `Handle`.
@@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a
 rethrowAnnotateIOError desc =
   modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
 
--- Code that is executed in a @fork@-ed process and that the replaces iteself
--- with the actual job process
-runJobProcess :: JobId -> Client -> IO ()
-runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
+
+-- | Code that is executed in a @fork@-ed process. Performs communication with
+-- the parent process by calling commFn and then runs pyExecIO python
+-- executable.
+runProcess :: JobId -- ^ a job to process
+           -> Client -- ^ UDS transport
+           -> IO FilePath -- ^ path to the python executable
+           -> ((String -> IO ()) -> JobId -> Client -> IO Fd)
+              -- ^ pre-execution function communicating with the parent. The
+              -- function returns the file descriptor which should be
+              -- remain open
+           -> IO ()
+runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $
   do
     -- Close the standard error to prevent anything being written there
     -- (for example by exceptions when closing unneeded FDs).
@@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
     let logLater _ = return ()
 
     logLater $ "Forking a new process for job " ++ show (fromJobId jid)
-
-    -- Create a livelock file for the job
-    (TOD ts _) <- getClockTime
-    lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts
-
-    -- Lock the livelock file
-    logLater $ "Locking livelock file " ++ show lockfile
-    fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock file"
-    logLater "Sending the lockfile name to the master process"
-    sendMsg s lockfile
-
-    logLater "Waiting for the master process to confirm the lock"
-    _ <- recvMsg s
-
+    preserve_fd <- commFn logLater jid s
     -- close the client
     logLater "Closing the client"
     (clFdR, clFdW) <- clientToFd s
@@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
     closeFd clFdR
     closeFd clFdW
 
-    fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds
+    fds <- (filter (> 2) . filter (/= preserve_fd)) <$> toErrorBase listOpenFds
     logLater $ "Closing every superfluous file descriptor: " ++ show fds
     mapM_ (tryIOError . closeFd) fds
 
-    -- the master process will send the job id and the livelock file name
-    -- using the same protocol to the job process
-    -- we pass the job id as the first argument to the process;
-    -- while the process never uses it, it's very convenient when listing
-    -- job processes
+    -- The master process will send the job id and the livelock file name
+    -- using the same protocol. We pass the job id as the first argument
+    -- to the process. While the process never uses it, it's very convenient
+    -- when listing job processes.
     use_debug <- isDebugMode
     env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
             . M.insert "PYTHONPATH" AC.versionedsharedir
             . M.fromList)
            `liftM` getEnvironment
-    execPy <- P.jqueueExecutorPy
+    execPy <- pyExecIO
     logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
                ++ " with PYTHONPATH=" ++ AC.versionedsharedir
     () <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)]
@@ -224,6 +218,59 @@ forkWithPipe conf childAction = do
            $ closeClient child
   return (pid, master)
 
+-- | Kill the process with the id provided.
+killProcessOnError :: (FromString e, Show e)
+                   => ProcessID -- ^ job process pid
+                   -> Client -- ^ UDS client connected to the master node
+                   -> (String -> ResultT e (WriterLogT IO) ())
+                      -- ^ log function
+                   -> ResultT e (WriterLogT IO) ()
+killProcessOnError pid master logFn = do
+  logFn "Closing the pipe to the client"
+  withErrorLogAt WARNING "Closing the communication pipe failed"
+                 (liftIO (closeClient master)) `orElse` return ()
+  killIfAlive [sigTERM, sigABRT, sigKILL]
+  where killIfAlive [] = return ()
+        killIfAlive (sig : sigs) = do
+          logFn "Getting the status of the process"
+          status <- tryError . liftIO $ getProcessStatus False True pid
+          case status of
+            Left e -> logFn $ "Job process already gone: " ++ show e
+            Right (Just s) -> logFn $ "Child process status: " ++ show s
+            Right Nothing -> do
+              logFn $ "Child process running, killing by " ++ show sig
+              liftIO $ signalProcess sig pid
+              unless (null sigs) $ do
+                threadDelay 100000 -- wait for 0.1s and check again
+                killIfAlive sigs
+
+-- | Forks current process and running runFn in the child and commFn in the
+-- parent. Due to a bug in GHC forking process, we want to retry if the forked
+-- process fails to start. If it fails later on, the failure is handled by
+-- 'ResultT' and no retry is performed.
+forkProcessCatchErrors :: (Show e, FromString e)
+                       => (Client -> IO ())
+                       -> (ProcessID -> String -> ResultT e (WriterLogT IO) ())
+                       -> (ProcessID -> Client
+                           -> ResultT e (WriterLogT IO) (FilePath, ProcessID))
+                       -> ResultT e IO (FilePath, ProcessID)
+forkProcessCatchErrors runFn logFn commFn = do
+  -- Due to a bug in GHC forking process, we want to retry
+  -- if the forked process fails to start.
+  -- If it fails later on, the failure is handled by 'ResultT'
+  -- and no retry is performed.
+  let execWriterLogInside = ResultT . execWriterLogT . runResultT
+  retryErrorN C.luxidRetryForkCount
+               $ \tryNo -> execWriterLogInside $ do
+    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
+    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
+
+    (pid, master) <- liftIO $ forkWithPipe connectConfig runFn
+
+    logFn pid "Forked a new process"
+    flip catchError (\e -> killProcessOnError pid master (logFn pid)
+                           >> throwError e) $ commFn pid master
+
 -- | Forks the job process and starts processing of the given job.
 -- Returns the livelock of the job and its process ID.
 forkJobProcess :: (FromString e, Show e)
@@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e)
                   -- and process id in the job file
                -> ResultT e IO (FilePath, ProcessID)
 forkJobProcess job luxiLivelock update = do
-  let jidStr = show . fromJobId . qjId $ job
-
-  -- Retrieve secret parameters if present
-  let secretParams = encodeStrict . filterSecretParameters . qjOps $ job
 
   logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
              ++ " for job " ++ jidStr
   update luxiLivelock
 
-  -- Due to a bug in GHC forking process, we want to retry,
-  -- if the forked process fails to start.
-  -- If it fails later on, the failure is handled by 'ResultT'
-  -- and no retry is performed.
-  let execWriterLogInside = ResultT . execWriterLogT . runResultT
-  retryErrorN C.luxidRetryForkCount
-               $ \tryNo -> execWriterLogInside $ do
-    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
-    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
-
-    (pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess
-                                                          . qjId $ job)
-
-    let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] "
-        logDebugJob = logDebug . (jobLogPrefix ++)
-
-    logDebugJob "Forked a new process"
-
-    let killIfAlive [] = return ()
-        killIfAlive (sig : sigs) = do
-          logDebugJob "Getting the status of the process"
-          status <- tryError . liftIO $ getProcessStatus False True pid
-          case status of
-            Left e -> logDebugJob $ "Job process already gone: " ++ show e
-            Right (Just s) -> logDebugJob $ "Child process status: " ++ show s
-            Right Nothing -> do
-                logDebugJob $ "Child process running, killing by " ++ show sig
-                liftIO $ signalProcess sig pid
-                unless (null sigs) $ do
-                  threadDelay 100000 -- wait for 0.1s and check again
-                  killIfAlive sigs
-
-    let onError = do
-          logDebugJob "Closing the pipe to the client"
-          withErrorLogAt WARNING "Closing the communication pipe failed"
-              (liftIO (closeClient master)) `orElse` return ()
-          killIfAlive [sigTERM, sigABRT, sigKILL]
-
-    flip catchError (\e -> onError >> throwError e)
-      $ do
+  forkProcessCatchErrors (childMain . qjId $ job) logDebugJob
+                         parentMain
+  where
+    -- Retrieve secret parameters if present
+    secretParams = encodeStrict . filterSecretParameters . qjOps $ job
+    jidStr = show . fromJobId . qjId $ job
+    jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ "] "
+    logDebugJob pid = logDebug . (jobLogPrefix pid ++)
+
+    -- | Code performing communication with the child process. First, receive
+    -- the livelock, then send necessary parameters to the python child.
+    parentMain pid master = do
       let annotatedIO msg k = do
-            logDebugJob msg
-            liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
+            logDebugJob pid msg
+            liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k
       let recv msg = annotatedIO msg (recvMsg master)
           send msg x = annotatedIO msg (sendMsg master x)
 
       lockfile <- recv "Getting the lockfile of the client"
 
-      logDebugJob $ "Setting the lockfile to the final " ++ lockfile
+      logDebugJob pid ("Setting the lockfile to the final " ++ lockfile)
       toErrorBase $ update lockfile
       send "Confirming the client it can start" ""
 
       -- from now on, we communicate with the job's Python process
-
       _ <- recv "Waiting for the job to ask for the job id"
       send "Writing job id to the client" jidStr
-
       _ <- recv "Waiting for the job to ask for the lock file name"
       send "Writing the lock file name to the client" lockfile
-
       _ <- recv "Waiting for the job to ask for secret parameters"
       send "Writing secret parameters to the client" secretParams
-
       liftIO $ closeClient master
-
       return (lockfile, pid)
+
+    -- | Code performing communication with the parent process. During
+    -- communication the livelock is created, locked and sent back
+    -- to the parent.
+    childMain jid s = runProcess jid s P.jqueueExecutorPy commFn
+      where
+        commFn logFn jid' s' = do
+          -- Create a livelock file for the job
+          (TOD ts _) <- getClockTime
+          lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid') ts
+          -- Lock the livelock file
+          _ <- logFn $ "Locking livelock file " ++ show lockfile
+          fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock"
+          _ <- logFn "Sending the lockfile name to the master process"
+          sendMsg s' lockfile
+          _ <- logFn "Waiting for the master process to confirm the lock"
+          _ <- recvMsg s'
+          return fd
-- 
2.6.0.rc2.230.g3dd15c0

Reply via email to