On Fri, Apr 11, 2014 at 12:43 PM, Klaus Aehlig <[email protected]> wrote:
> In this way, we will be able to support in WConfD waiting for locks > to become available instead of having to poll for them. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > src/Ganeti/Locking/Locks.hs | 28 ++++++++++++------------ > src/Ganeti/WConfd/Core.hs | 17 ++++++--------- > src/Ganeti/WConfd/DeathDetection.hs | 3 ++- > src/Ganeti/WConfd/Monad.hs | 43 > ++++++++++++++++++++++--------------- > src/Ganeti/WConfd/Server.hs | 4 ++-- > 5 files changed, 51 insertions(+), 44 deletions(-) > > diff --git a/src/Ganeti/Locking/Locks.hs b/src/Ganeti/Locking/Locks.hs > index 4be5bea..e9080fb 100644 > --- a/src/Ganeti/Locking/Locks.hs > +++ b/src/Ganeti/Locking/Locks.hs > @@ -30,7 +30,7 @@ module Ganeti.Locking.Locks > , lockName > , ClientType(..) > , ClientId(..) > - , GanetiLockAllocation > + , GanetiLockWaiting > , loadLockAllocation > , writeLocksAsyncTask > , LockLevel(..) > @@ -49,8 +49,8 @@ import qualified Text.JSON as J > import Ganeti.BasicTypes > import Ganeti.Errors (ResultG, GanetiException) > import Ganeti.JSON (readEitherString, fromJResultE) > -import Ganeti.Locking.Allocation > import Ganeti.Locking.Types > +import Ganeti.Locking.Waiting > import Ganeti.Logging.Lifted (MonadLog, logDebug, logEmergency) > import Ganeti.Types > import Ganeti.Utils.Atomic > @@ -230,33 +230,33 @@ instance J.JSON ClientId where > > -- | The type of lock Allocations in Ganeti. In Ganeti, the owner of > -- locks are jobs. > -type GanetiLockAllocation = LockAllocation GanetiLocks ClientId > +type GanetiLockWaiting = LockWaiting GanetiLocks ClientId Integer > > -- | Load a lock allocation from disk. > -loadLockAllocation :: FilePath -> ResultG GanetiLockAllocation > +loadLockAllocation :: FilePath -> ResultG GanetiLockWaiting > loadLockAllocation = > liftIO . readFile > - >=> fromJResultE "parsing lock allocation" . J.decodeStrict > + >=> fromJResultE "parsing lock waiting structure" . J.decodeStrict > > -- | Write lock allocation to disk, overwriting any previously lock > -- allocation stored there. > Just nitpicking: s/previously/previous/ > writeLocks :: (MonadBase IO m, MonadError GanetiException m, MonadLog m) > - => FilePath -> GanetiLockAllocation -> m () > -writeLocks fpath lockAlloc = do > - logDebug "Async. lock allocation writer: Starting write" > - toErrorBase . liftIO . atomicWriteFile fpath $ J.encode lockAlloc > - logDebug "Async. lock allocation writer: written" > + => FilePath -> GanetiLockWaiting -> m () > +writeLocks fpath lockWait = do > + logDebug "Async. lock status writer: Starting write" > + toErrorBase . liftIO . atomicWriteFile fpath $ J.encode lockWait > + logDebug "Async. lock status writer: written" > > -- | Construct an asynchronous worker whose action is to save the > -- current state of the lock allocation. > -- The worker's action reads the lock allocation using the given @IO@ > -- action. Any inbetween changes to the file are tacitly ignored. > writeLocksAsyncTask :: FilePath -- ^ Path to the lock file > - -> IO GanetiLockAllocation -- ^ An action to read the > - -- current lock allocation > + -> IO GanetiLockWaiting -- ^ An action to read the > + -- current lock allocation > -> ResultG (AsyncWorker ()) > -writeLocksAsyncTask fpath lockAllocAction = mkAsyncWorker $ > +writeLocksAsyncTask fpath lockWaitingAction = mkAsyncWorker $ > catchError (do > - locks <- liftBase lockAllocAction > + locks <- liftBase lockWaitingAction > writeLocks fpath locks > ) (logEmergency . (++) "Can't write lock allocation status: " . show) > diff --git a/src/Ganeti/WConfd/Core.hs b/src/Ganeti/WConfd/Core.hs > index f751913..e008b5f 100644 > --- a/src/Ganeti/WConfd/Core.hs > +++ b/src/Ganeti/WConfd/Core.hs > @@ -41,6 +41,7 @@ import qualified Ganeti.JSON as J > import qualified Ganeti.Locking.Allocation as L > import Ganeti.Locking.Locks ( GanetiLocks(ConfigLock), > LockLevel(LevelConfig) > , lockLevel, LockLevel, ClientId ) > +import qualified Ganeti.Locking.Waiting as LW > import Ganeti.Objects (ConfigData) > import Ganeti.WConfd.Language > import Ganeti.WConfd.Monad > @@ -124,38 +125,34 @@ tryUpdateLocks :: ClientId -> GanetiLockRequest -> > WConfdMonad [ClientId] > tryUpdateLocks cid req = > liftM S.toList > . (>>= toErrorStr) > - $ modifyLockAllocation (L.updateLocks cid (fromGanetiLockRequest req)) > + $ modifyLockWaiting (LW.updateLocks cid (fromGanetiLockRequest req)) > > -- | Free all locks of a given owner (i.e., a job-id lockfile pair). > freeLocks :: ClientId -> WConfdMonad () > freeLocks cid = > - modifyLockAllocation_ (`L.freeLocks` cid) > + modifyLockWaiting_ $ LW.releaseResources cid > > -- | Free all locks of a given owner (i.e., a job-id lockfile pair) > -- of a given level in the Ganeti sense (e.g., "cluster", "node"). > freeLocksLevel :: ClientId -> LockLevel -> WConfdMonad () > freeLocksLevel cid level = > - modifyLockAllocation_ (L.freeLocksPredicate ((==) level . lockLevel) > - `flip` cid) > + modifyLockWaiting_ $ LW.freeLocksPredicate ((==) level . lockLevel) cid > > -- | Downgrade all locks of the given level to shared. > downGradeLocksLevel :: ClientId -> LockLevel -> WConfdMonad () > downGradeLocksLevel cid level = > - modifyLockAllocation_ $ L.downGradePredicate ((==) level . lockLevel) > cid > + modifyLockWaiting_ $ LW.downGradeLocksPredicate ((==) level . > lockLevel) cid > > -- | Intersect the possesed locks of an owner with a given set. > intersectLocks :: ClientId -> [GanetiLocks] -> WConfdMonad () > -intersectLocks cid = > - modifyLockAllocation_ . L.intersectLocks cid > +intersectLocks cid locks = modifyLockWaiting_ $ LW.intersectLocks locks > cid > > -- | Opportunistically allocate locks for a given owner. > opportunisticLockUnion :: ClientId > -> [(GanetiLocks, L.OwnerState)] > -> WConfdMonad [GanetiLocks] > opportunisticLockUnion cid req = > - liftM S.toList > - . modifyLockAllocation > - $ L.opportunisticLockUnion cid req > + modifyLockWaiting $ LW.opportunisticLockUnion cid req > > -- * The list of all functions exported to RPC. > > diff --git a/src/Ganeti/WConfd/DeathDetection.hs > b/src/Ganeti/WConfd/DeathDetection.hs > index e5d8a0b..414420b 100644 > --- a/src/Ganeti/WConfd/DeathDetection.hs > +++ b/src/Ganeti/WConfd/DeathDetection.hs > @@ -45,6 +45,7 @@ import System.Posix.IO > import Ganeti.BasicTypes > import qualified Ganeti.Constants as C > import qualified Ganeti.Locking.Allocation as L > +import qualified Ganeti.Locking.Waiting as LW > import Ganeti.Locking.Locks (ClientId(..)) > import Ganeti.Logging.Lifted (logDebug, logInfo) > import Ganeti.WConfd.Monad > @@ -76,7 +77,7 @@ cleanupLocksTask = forever . runResultT $ do > died <- liftIO (isDead fpath) > when died $ do > logInfo $ show owner ++ " died, releasing locks" > - modifyLockAllocation_ (`L.freeLocks` owner) > + modifyLockWaiting_ (LW.releaseResources owner) > _ <- liftIO . try $ removeFile fpath > :: WConfdMonad (Either IOError ()) > return () > diff --git a/src/Ganeti/WConfd/Monad.hs b/src/Ganeti/WConfd/Monad.hs > index 47e0219..4a676f3 100644 > --- a/src/Ganeti/WConfd/Monad.hs > +++ b/src/Ganeti/WConfd/Monad.hs > @@ -44,26 +44,29 @@ module Ganeti.WConfd.Monad > , daemonHandle > , modifyConfigState > , readConfigState > - , modifyLockAllocation > - , modifyLockAllocation_ > + , modifyLockWaiting > + , modifyLockWaiting_ > , readLockAllocation > ) where > > import Control.Applicative > -import Control.Arrow ((&&&)) > +import Control.Arrow ((&&&), second) > import Control.Monad > import Control.Monad.Base > import Control.Monad.Error > import Control.Monad.Reader > import Control.Monad.Trans.Control > import Data.IORef.Lifted > +import qualified Data.Set as S > import Data.Tuple (swap) > import qualified Text.JSON as J > > import Ganeti.BasicTypes > import Ganeti.Errors > import Ganeti.Lens > +import Ganeti.Locking.Allocation (LockAllocation) > import Ganeti.Locking.Locks > +import Ganeti.Locking.Waiting (getAllocation) > import Ganeti.Logging > import Ganeti.Utils.AsyncWorker > import Ganeti.WConfd.ConfigState > @@ -74,7 +77,7 @@ import Ganeti.WConfd.ConfigState > -- locking state. > data DaemonState = DaemonState > { dsConfigState :: ConfigState > - , dsLockAllocation :: GanetiLockAllocation > + , dsLockWaiting :: GanetiLockWaiting > } > > $(makeCustomLenses ''DaemonState) > @@ -93,7 +96,7 @@ data DaemonHandle = DaemonHandle > > mkDaemonHandle :: FilePath > -> ConfigState > - -> GanetiLockAllocation > + -> GanetiLockWaiting > -> (IO ConfigState -> ResultG (AsyncWorker ())) > -- ^ A function that creates a worker that > asynchronously > -- saves the configuration to the master file. > @@ -103,7 +106,7 @@ mkDaemonHandle :: FilePath > -> (IO ConfigState -> ResultG (AsyncWorker ())) > -- ^ A function that creates a worker that > asynchronously > -- distributes SSConf to nodes > - -> (IO GanetiLockAllocation -> ResultG (AsyncWorker ())) > + -> (IO GanetiLockWaiting -> ResultG (AsyncWorker ())) > -- ^ A function that creates a worker that > asynchronously > -- saves the lock allocation state. > -> ResultG DaemonHandle > @@ -117,7 +120,7 @@ mkDaemonHandle cpath cstat lstat > ssconfWorker <- distSSConfWorkerFn readConfigIO > distMCsWorker <- distMCsWorkerFn readConfigIO > > - saveLockWorker <- saveLockWorkerFn $ dsLockAllocation `liftM` readIORef > ds > + saveLockWorker <- saveLockWorkerFn $ dsLockWaiting `liftM` readIORef ds > > return $ DaemonHandle ds cpath saveWorker distMCsWorker ssconfWorker > saveLockWorker > @@ -200,27 +203,33 @@ modifyConfigState f = do > return () > return r > > --- | Atomically modifies the lock allocation state in WConfdMonad. > -modifyLockAllocation :: (GanetiLockAllocation -> (GanetiLockAllocation, > a)) > +-- | Atomically modifies the lock waiting state in WConfdMonad. > +modifyLockWaiting :: (GanetiLockWaiting -> ( GanetiLockWaiting > + , (a, S.Set ClientId) )) > -> WConfdMonad a > -modifyLockAllocation f = do > +modifyLockWaiting f = do > dh <- lift . WConfdMonadInt $ ask > let f' = swap . (fst &&& id) . f > - (lockAlloc, r) <- atomicModifyIORef (dhDaemonState dh) > - (swap . traverseOf > dsLockAllocationL f') > + (lockAlloc, (r, nfy)) <- atomicModifyIORef > + (dhDaemonState dh) > + (swap . traverseOf dsLockWaitingL f') > logDebug $ "Current lock status: " ++ J.encode lockAlloc > logDebug "Triggering lock state write" > liftBase . triggerAndWait . dhSaveLocksWorker $ dh > logDebug "Lock write finished" > + unless (S.null nfy) $ do > + logDebug . (++) "Locks became available for " . show $ S.toList nfy > + logWarning "Process notification not yet implemented" > return r > > -- | Atomically modifies the lock allocation state in WConfdMonad, not > -- producing any result > -modifyLockAllocation_ :: (GanetiLockAllocation -> GanetiLockAllocation) > +modifyLockWaiting_ :: (GanetiLockWaiting -> (GanetiLockWaiting, S.Set > ClientId)) > -> WConfdMonad () > -modifyLockAllocation_ = modifyLockAllocation . (flip (,) () .) > +modifyLockWaiting_ = modifyLockWaiting . ((second $ (,) ()) .) > > --- | Read the lock allocation state. > -readLockAllocation :: WConfdMonad GanetiLockAllocation > -readLockAllocation = liftM dsLockAllocation . readIORef . dhDaemonState > +-- | Read the underlying lock allocation. > +readLockAllocation :: WConfdMonad (LockAllocation GanetiLocks ClientId) > +readLockAllocation = liftM (getAllocation . dsLockWaiting) > + . readIORef . dhDaemonState > =<< daemonHandle > diff --git a/src/Ganeti/WConfd/Server.hs b/src/Ganeti/WConfd/Server.hs > index 903c5bc..569b3f7 100644 > --- a/src/Ganeti/WConfd/Server.hs > +++ b/src/Ganeti/WConfd/Server.hs > @@ -40,8 +40,8 @@ import System.Directory (doesFileExist) > import Ganeti.BasicTypes > import Ganeti.Daemon > import Ganeti.Logging (logInfo, logDebug) > -import Ganeti.Locking.Allocation > import Ganeti.Locking.Locks > +import Ganeti.Locking.Waiting > import qualified Ganeti.Path as Path > import Ganeti.THH.RPC > import Ganeti.UDSServer > @@ -84,7 +84,7 @@ prepMain _ _ = do > (cdata, cstat) <- loadConfigFromFile conf_file > lock <- if lock_file_present > then loadLockAllocation lock_file > - else return emptyAllocation > + else return emptyWaiting > mkDaemonHandle conf_file > (mkConfigState cdata) > lock > -- > 1.9.1.423.g4596e3a > > LGTM, thanks
