On Fri, Apr 11, 2014 at 12:43 PM, Klaus Aehlig <[email protected]> wrote:

> In this way, we will be able to support in WConfD waiting for locks
> to become available instead of having to poll for them.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  src/Ganeti/Locking/Locks.hs         | 28 ++++++++++++------------
>  src/Ganeti/WConfd/Core.hs           | 17 ++++++---------
>  src/Ganeti/WConfd/DeathDetection.hs |  3 ++-
>  src/Ganeti/WConfd/Monad.hs          | 43
> ++++++++++++++++++++++---------------
>  src/Ganeti/WConfd/Server.hs         |  4 ++--
>  5 files changed, 51 insertions(+), 44 deletions(-)
>
> diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs
> index 4be5bea..e9080fb 100644
> --- a/src/Ganeti/Locking/Locks.hs
> +++ b/src/Ganeti/Locking/Locks.hs
> @@ -30,7 +30,7 @@ module Ganeti.Locking.Locks
>    , lockName
>    , ClientType(..)
>    , ClientId(..)
> -  , GanetiLockAllocation
> +  , GanetiLockWaiting
>    , loadLockAllocation
>    , writeLocksAsyncTask
>    , LockLevel(..)
> @@ -49,8 +49,8 @@ import qualified Text.JSON as J
>  import Ganeti.BasicTypes
>  import Ganeti.Errors (ResultG, GanetiException)
>  import Ganeti.JSON (readEitherString, fromJResultE)
> -import Ganeti.Locking.Allocation
>  import Ganeti.Locking.Types
> +import Ganeti.Locking.Waiting
>  import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency)
>  import Ganeti.Types
>  import Ganeti.Utils.Atomic
> @@ -230,33 +230,33 @@ instance J.JSON ClientId where
>
>  -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of
>  -- locks are jobs.
> -type GanetiLockAllocation = LockAllocation GanetiLocks ClientId
> +type GanetiLockWaiting = LockWaiting GanetiLocks ClientId Integer
>
>  -- | Load a lock allocation from disk.
> -loadLockAllocation :: FilePath -> ResultG GanetiLockAllocation
> +loadLockAllocation :: FilePath -> ResultG GanetiLockWaiting
>  loadLockAllocation =
>    liftIO . readFile
> -  >=> fromJResultE "parsing lock allocation" . J.decodeStrict
> +  >=> fromJResultE "parsing lock waiting structure" . J.decodeStrict
>
>  -- | Write lock allocation to disk, overwriting any previously lock
>  -- allocation stored there.
>

Just nitpicking: s/previously/previous/


>  writeLocks :: (MonadBase IO m, MonadError GanetiException m, MonadLog m)
> -           => FilePath -> GanetiLockAllocation -> m ()
> -writeLocks fpath lockAlloc = do
> -  logDebug "Async. lock allocation writer: Starting write"
> -  toErrorBase . liftIO . atomicWriteFile fpath $ J.encode lockAlloc
> -  logDebug "Async. lock allocation writer: written"
> +           => FilePath -> GanetiLockWaiting -> m ()
> +writeLocks fpath lockWait = do
> +  logDebug "Async. lock status writer: Starting write"
> +  toErrorBase . liftIO . atomicWriteFile fpath $ J.encode lockWait
> +  logDebug "Async. lock status writer: written"
>
>  -- | Construct an asynchronous worker whose action is to save the
>  -- current state of the lock allocation.
>  -- The worker's action reads the lock allocation using the given @IO@
>  -- action. Any inbetween changes to the file are tacitly ignored.
>  writeLocksAsyncTask :: FilePath -- ^ Path to the lock file
> -                    -> IO GanetiLockAllocation -- ^ An action to read the
> -                                               -- current lock allocation
> +                    -> IO GanetiLockWaiting -- ^ An action to read the
> +                                            -- current lock allocation
>                      -> ResultG (AsyncWorker ())
> -writeLocksAsyncTask fpath lockAllocAction = mkAsyncWorker $
> +writeLocksAsyncTask fpath lockWaitingAction = mkAsyncWorker $
>    catchError (do
> -    locks <- liftBase lockAllocAction
> +    locks <- liftBase lockWaitingAction
>      writeLocks fpath locks
>    ) (logEmergency . (++) "Can't write lock allocation status: " . show)
> diff --git a/src/Ganeti/WConfd/Core.hs b/src/Ganeti/WConfd/Core.hs
> index f751913..e008b5f 100644
> --- a/src/Ganeti/WConfd/Core.hs
> +++ b/src/Ganeti/WConfd/Core.hs
> @@ -41,6 +41,7 @@ import qualified Ganeti.JSON as J
>  import qualified Ganeti.Locking.Allocation as L
>  import Ganeti.Locking.Locks ( GanetiLocks(ConfigLock),
> LockLevel(LevelConfig)
>                              , lockLevel, LockLevel, ClientId )
> +import qualified Ganeti.Locking.Waiting as LW
>  import Ganeti.Objects (ConfigData)
>  import Ganeti.WConfd.Language
>  import Ganeti.WConfd.Monad
> @@ -124,38 +125,34 @@ tryUpdateLocks :: ClientId -> GanetiLockRequest ->
> WConfdMonad [ClientId]
>  tryUpdateLocks cid req =
>    liftM S.toList
>    . (>>= toErrorStr)
> -  $ modifyLockAllocation (L.updateLocks cid (fromGanetiLockRequest req))
> +  $ modifyLockWaiting (LW.updateLocks cid (fromGanetiLockRequest req))
>
>  -- | Free all locks of a given owner (i.e., a job-id lockfile pair).
>  freeLocks :: ClientId -> WConfdMonad ()
>  freeLocks cid =
> -  modifyLockAllocation_ (`L.freeLocks` cid)
> +  modifyLockWaiting_ $ LW.releaseResources cid
>
>  -- | Free all locks of a given owner (i.e., a job-id lockfile pair)
>  -- of a given level in the Ganeti sense (e.g., "cluster", "node").
>  freeLocksLevel :: ClientId -> LockLevel -> WConfdMonad ()
>  freeLocksLevel cid level =
> -  modifyLockAllocation_ (L.freeLocksPredicate ((==) level . lockLevel)
> -                           `flip` cid)
> +  modifyLockWaiting_ $ LW.freeLocksPredicate ((==) level . lockLevel) cid
>
>  -- | Downgrade all locks of the given level to shared.
>  downGradeLocksLevel :: ClientId -> LockLevel -> WConfdMonad ()
>  downGradeLocksLevel cid level =
> -  modifyLockAllocation_ $ L.downGradePredicate ((==) level . lockLevel)
> cid
> +  modifyLockWaiting_ $ LW.downGradeLocksPredicate ((==) level .
> lockLevel) cid
>
>  -- | Intersect the possesed locks of an owner with a given set.
>  intersectLocks :: ClientId -> [GanetiLocks] -> WConfdMonad ()
> -intersectLocks cid =
> - modifyLockAllocation_ . L.intersectLocks cid
> +intersectLocks cid locks = modifyLockWaiting_ $ LW.intersectLocks locks
> cid
>
>  -- | Opportunistically allocate locks for a given owner.
>  opportunisticLockUnion :: ClientId
>                         -> [(GanetiLocks, L.OwnerState)]
>                         -> WConfdMonad [GanetiLocks]
>  opportunisticLockUnion cid req =
> -  liftM S.toList
> -  . modifyLockAllocation
> -  $ L.opportunisticLockUnion cid req
> +  modifyLockWaiting $ LW.opportunisticLockUnion cid req
>
>  -- * The list of all functions exported to RPC.
>
> diff --git a/src/Ganeti/WConfd/DeathDetection.hs
> b/src/Ganeti/WConfd/DeathDetection.hs
> index e5d8a0b..414420b 100644
> --- a/src/Ganeti/WConfd/DeathDetection.hs
> +++ b/src/Ganeti/WConfd/DeathDetection.hs
> @@ -45,6 +45,7 @@ import System.Posix.IO
>  import Ganeti.BasicTypes
>  import qualified Ganeti.Constants as C
>  import qualified Ganeti.Locking.Allocation as L
> +import qualified Ganeti.Locking.Waiting as LW
>  import Ganeti.Locking.Locks (ClientId(..))
>  import Ganeti.Logging.Lifted (logDebug, logInfo)
>  import Ganeti.WConfd.Monad
> @@ -76,7 +77,7 @@ cleanupLocksTask = forever . runResultT $ do
>          died <- liftIO (isDead fpath)
>          when died $ do
>            logInfo $ show owner ++ " died, releasing locks"
> -          modifyLockAllocation_ (`L.freeLocks` owner)
> +          modifyLockWaiting_ (LW.releaseResources owner)
>            _ <- liftIO . try $ removeFile fpath
>                 :: WConfdMonad (Either IOError ())
>            return ()
> diff --git a/src/Ganeti/WConfd/Monad.hs b/src/Ganeti/WConfd/Monad.hs
> index 47e0219..4a676f3 100644
> --- a/src/Ganeti/WConfd/Monad.hs
> +++ b/src/Ganeti/WConfd/Monad.hs
> @@ -44,26 +44,29 @@ module Ganeti.WConfd.Monad
>    , daemonHandle
>    , modifyConfigState
>    , readConfigState
> -  , modifyLockAllocation
> -  , modifyLockAllocation_
> +  , modifyLockWaiting
> +  , modifyLockWaiting_
>    , readLockAllocation
>    ) where
>
>  import Control.Applicative
> -import Control.Arrow ((&&&))
> +import Control.Arrow ((&&&), second)
>  import Control.Monad
>  import Control.Monad.Base
>  import Control.Monad.Error
>  import Control.Monad.Reader
>  import Control.Monad.Trans.Control
>  import Data.IORef.Lifted
> +import qualified Data.Set as S
>  import Data.Tuple (swap)
>  import qualified Text.JSON as J
>
>  import Ganeti.BasicTypes
>  import Ganeti.Errors
>  import Ganeti.Lens
> +import Ganeti.Locking.Allocation (LockAllocation)
>  import Ganeti.Locking.Locks
> +import Ganeti.Locking.Waiting (getAllocation)
>  import Ganeti.Logging
>  import Ganeti.Utils.AsyncWorker
>  import Ganeti.WConfd.ConfigState
> @@ -74,7 +77,7 @@ import Ganeti.WConfd.ConfigState
>  -- locking state.
>  data DaemonState = DaemonState
>    { dsConfigState :: ConfigState
> -  , dsLockAllocation :: GanetiLockAllocation
> +  , dsLockWaiting :: GanetiLockWaiting
>    }
>
>  $(makeCustomLenses ''DaemonState)
> @@ -93,7 +96,7 @@ data DaemonHandle = DaemonHandle
>
>  mkDaemonHandle :: FilePath
>                 -> ConfigState
> -               -> GanetiLockAllocation
> +               -> GanetiLockWaiting
>                 -> (IO ConfigState -> ResultG (AsyncWorker ()))
>                    -- ^ A function that creates a worker that
> asynchronously
>                    -- saves the configuration to the master file.
> @@ -103,7 +106,7 @@ mkDaemonHandle :: FilePath
>                 -> (IO ConfigState -> ResultG (AsyncWorker ()))
>                    -- ^ A function that creates a worker that
> asynchronously
>                    -- distributes SSConf to nodes
> -               -> (IO GanetiLockAllocation -> ResultG (AsyncWorker ()))
> +               -> (IO GanetiLockWaiting -> ResultG (AsyncWorker ()))
>                    -- ^ A function that creates a worker that
> asynchronously
>                    -- saves the lock allocation state.
>                 -> ResultG DaemonHandle
> @@ -117,7 +120,7 @@ mkDaemonHandle cpath cstat lstat
>    ssconfWorker <- distSSConfWorkerFn readConfigIO
>    distMCsWorker <- distMCsWorkerFn readConfigIO
>
> -  saveLockWorker <- saveLockWorkerFn $ dsLockAllocation `liftM` readIORef
> ds
> +  saveLockWorker <- saveLockWorkerFn $ dsLockWaiting `liftM` readIORef ds
>
>    return $ DaemonHandle ds cpath saveWorker distMCsWorker ssconfWorker
>                                   saveLockWorker
> @@ -200,27 +203,33 @@ modifyConfigState f = do
>      return ()
>    return r
>
> --- | Atomically modifies the lock allocation state in WConfdMonad.
> -modifyLockAllocation :: (GanetiLockAllocation -> (GanetiLockAllocation,
> a))
> +-- | Atomically modifies the lock waiting state in WConfdMonad.
> +modifyLockWaiting :: (GanetiLockWaiting -> ( GanetiLockWaiting
> +                                           , (a, S.Set ClientId) ))
>                       -> WConfdMonad a
> -modifyLockAllocation f = do
> +modifyLockWaiting f = do
>    dh <- lift . WConfdMonadInt $ ask
>    let f' = swap . (fst &&& id) . f
> -  (lockAlloc, r) <- atomicModifyIORef (dhDaemonState dh)
> -                                      (swap . traverseOf
> dsLockAllocationL f')
> +  (lockAlloc, (r, nfy)) <- atomicModifyIORef
> +                             (dhDaemonState dh)
> +                             (swap . traverseOf dsLockWaitingL f')
>    logDebug $ "Current lock status: " ++ J.encode lockAlloc
>    logDebug "Triggering lock state write"
>    liftBase . triggerAndWait . dhSaveLocksWorker $ dh
>    logDebug "Lock write finished"
> +  unless (S.null nfy) $ do
> +    logDebug . (++) "Locks became available for " . show $ S.toList nfy
> +    logWarning "Process notification not yet implemented"
>    return r
>
>  -- | Atomically modifies the lock allocation state in WConfdMonad, not
>  -- producing any result
> -modifyLockAllocation_ :: (GanetiLockAllocation -> GanetiLockAllocation)
> +modifyLockWaiting_ :: (GanetiLockWaiting -> (GanetiLockWaiting, S.Set
> ClientId))
>                        -> WConfdMonad ()
> -modifyLockAllocation_ = modifyLockAllocation . (flip (,) () .)
> +modifyLockWaiting_ = modifyLockWaiting . ((second $ (,) ()) .)
>
> --- | Read the lock allocation state.
> -readLockAllocation :: WConfdMonad GanetiLockAllocation
> -readLockAllocation = liftM dsLockAllocation . readIORef . dhDaemonState
> +-- | Read the underlying lock allocation.
> +readLockAllocation :: WConfdMonad (LockAllocation GanetiLocks ClientId)
> +readLockAllocation = liftM (getAllocation . dsLockWaiting)
> +                     . readIORef . dhDaemonState
>                       =<< daemonHandle
> diff --git a/src/Ganeti/WConfd/Server.hs b/src/Ganeti/WConfd/Server.hs
> index 903c5bc..569b3f7 100644
> --- a/src/Ganeti/WConfd/Server.hs
> +++ b/src/Ganeti/WConfd/Server.hs
> @@ -40,8 +40,8 @@ import System.Directory (doesFileExist)
>  import Ganeti.BasicTypes
>  import Ganeti.Daemon
>  import Ganeti.Logging (logInfo, logDebug)
> -import Ganeti.Locking.Allocation
>  import Ganeti.Locking.Locks
> +import Ganeti.Locking.Waiting
>  import qualified Ganeti.Path as Path
>  import Ganeti.THH.RPC
>  import Ganeti.UDSServer
> @@ -84,7 +84,7 @@ prepMain _ _ = do
>      (cdata, cstat) <- loadConfigFromFile conf_file
>      lock <- if lock_file_present
>                then loadLockAllocation lock_file
> -              else return emptyAllocation
> +              else return emptyWaiting
>      mkDaemonHandle conf_file
>                     (mkConfigState cdata)
>                     lock
> --
> 1.9.1.423.g4596e3a
>
>

LGTM, thanks

Reply via email to