Repository : ssh://darcs.haskell.org//srv/darcs/packages/stm On branch : master
http://hackage.haskell.org/trac/ghc/changeset/b72fafc5d58d90034d615d5426c744b849dc31b6 >--------------------------------------------------------------- commit b72fafc5d58d90034d615d5426c744b849dc31b6 Author: Simon Marlow <[email protected]> Date: Wed Jul 4 11:21:10 2012 +0100 Add TBQueue, bump version to 2.4 and document changes since 2.3 >--------------------------------------------------------------- Control/Concurrent/STM/TBQueue.hs | 175 +++++++++++++++++++++++++++++++++++++ stm.cabal | 16 +++- 2 files changed, 189 insertions(+), 2 deletions(-) diff --git a/Control/Concurrent/STM/TBQueue.hs b/Control/Concurrent/STM/TBQueue.hs new file mode 100644 index 0000000..42b04b2 --- /dev/null +++ b/Control/Concurrent/STM/TBQueue.hs @@ -0,0 +1,175 @@ +{-# OPTIONS_GHC -fno-warn-name-shadowing #-} +{-# LANGUAGE CPP, DeriveDataTypeable #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : Control.Concurrent.STM.TBQueue +-- Copyright : (c) The University of Glasgow 2012 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : [email protected] +-- Stability : experimental +-- Portability : non-portable (requires STM) +-- +-- 'TBQueue' is a bounded version of 'TQueue'. The queue has a maximum +-- capacity set when it is created. If the queue already contains the +-- maximum number of elements, then 'writeTBQueue' blocks until an +-- element is removed from the queue. +-- +-- The implementation is based on the traditional purely-functional +-- queue representation that uses two lists to obtain amortised /O(1)/ +-- enqueue and dequeue operations. +-- +----------------------------------------------------------------------------- + +module Control.Concurrent.STM.TBQueue ( + -- * TBQueue + TBQueue, + newTBQueue, + newTBQueueIO, + readTBQueue, + tryReadTBQueue, + peekTBQueue, + tryPeekTBQueue, + writeTBQueue, + unGetTBQueue, + isEmptyTBQueue, + ) where + + +import Control.Concurrent.STM + +#define _UPK_(x) {-# UNPACK #-} !(x) + +-- | 'TBQueue' is an abstract type representing a bounded FIFO channel. +data TBQueue a + = TBQueue _UPK_(TVar Int) -- CR: read capacity + _UPK_(TVar [a]) -- R: elements waiting to be read + _UPK_(TVar Int) -- CW: write capacity + _UPK_(TVar [a]) -- W: elements written (head is most recent) + +-- Total channel capacity remaining is CR + CW. Reads only need to +-- access CR, writes usually need to access only CW but sometimes need +-- CR. So in the common case we avoid contention between CR and CW. +-- +-- - when removing an element from R: +-- CR := CR + 1 +-- +-- - when adding an element to W: +-- if CW is non-zero +-- then CW := CW - 1 +-- then if CR is non-zero +-- then CW := CR - 1; CR := 0 +-- else **FULL** + +-- |Build and returns a new instance of 'TBQueue' +newTBQueue :: Int -- ^ maximum number of elements the queue can hold + -> STM (TBQueue a) +newTBQueue size = do + read <- newTVar [] + write <- newTVar [] + rsize <- newTVar 0 + wsize <- newTVar size + return (TBQueue rsize read wsize write) + +-- |@IO@ version of 'newTBQueue'. This is useful for creating top-level +-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTBQueueIO :: Int -> IO (TBQueue a) +newTBQueueIO size = do + read <- newTVarIO [] + write <- newTVarIO [] + rsize <- newTVarIO 0 + wsize <- newTVarIO size + return (TBQueue rsize read wsize write) + +-- |Write a value to a 'TBQueue'; blocks if the queue is full. +writeTBQueue :: TBQueue a -> a -> STM () +writeTBQueue (TBQueue rsize _read wsize write) a = do + w <- readTVar wsize + if (w /= 0) + then do writeTVar wsize (w - 1) + else do + r <- readTVar rsize + if (r /= 0) + then do writeTVar rsize 0 + writeTVar wsize (r - 1) + else retry + listend <- readTVar write + writeTVar write (a:listend) + +-- |Read the next value from the 'TBQueue'. +readTBQueue :: TBQueue a -> STM a +readTBQueue (TBQueue rsize read _wsize write) = do + xs <- readTVar read + r <- readTVar rsize + writeTVar rsize (r + 1) + case xs of + (x:xs') -> do + writeTVar read xs' + return x + [] -> do + ys <- readTVar write + case ys of + [] -> retry + _ -> do + let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be + -- short, otherwise it will conflict + writeTVar write [] + writeTVar read zs + return z + +-- | A version of 'readTBQueue' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +tryReadTBQueue :: TBQueue a -> STM (Maybe a) +tryReadTBQueue c = fmap Just (readTBQueue c) `orElse` return Nothing + +-- | Get the next value from the @TBQueue@ without removing it, +-- retrying if the channel is empty. +peekTBQueue :: TBQueue a -> STM a +peekTBQueue c = do + x <- readTBQueue c + unGetTBQueue c x + return x + +-- | A version of 'peekTBQueue' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +tryPeekTBQueue :: TBQueue a -> STM (Maybe a) +tryPeekTBQueue c = do + m <- tryReadTBQueue c + case m of + Nothing -> return Nothing + Just x -> do + unGetTBQueue c x + return m + +-- |Put a data item back onto a channel, where it will be the next item read. +-- Blocks if the queue is full. +unGetTBQueue :: TBQueue a -> a -> STM () +unGetTBQueue (TBQueue rsize read wsize _write) a = do + r <- readTVar rsize + if (r > 0) + then do writeTVar rsize (r - 1) + else do + w <- readTVar wsize + if (w > 0) + then writeTVar wsize (w - 1) + else retry + xs <- readTVar read + writeTVar read (a:xs) + +-- |Returns 'True' if the supplied 'TBQueue' is empty. +isEmptyTBQueue :: TBQueue a -> STM Bool +isEmptyTBQueue (TBQueue _rsize read _wsize write) = do + xs <- readTVar read + case xs of + (_:_) -> return False + [] -> do ys <- readTVar write + case ys of + [] -> return True + _ -> return False diff --git a/stm.cabal b/stm.cabal index 7c9366b..2db764d 100644 --- a/stm.cabal +++ b/stm.cabal @@ -1,11 +1,22 @@ name: stm -version: 2.3 +version: 2.4 license: BSD3 license-file: LICENSE maintainer: [email protected] synopsis: Software Transactional Memory category: Concurrency -description: A modular composable concurrency abstraction. +description: + A modular composable concurrency abstraction. + . + Changes in version 2.4 + . + * Added "Control.Concurrent.STM.TQueue" (a faster @TChan@) + * Added "Control.Concurrent.STM.TBQueue" (a bounded channel based on @TQueue@) + * @TChan@ has an @Eq@ instances + * Added @newBroadcastTChan@ and @newBroadcastTChanIO@ + * Some performance improvements for @TChan@ + * Added @cloneTChan@ + build-type: Simple cabal-version: >=1.6 @@ -23,6 +34,7 @@ library Control.Concurrent.STM.TChan Control.Concurrent.STM.TMVar Control.Concurrent.STM.TQueue + Control.Concurrent.STM.TBQueue Control.Monad.STM other-modules: Control.Sequential.STM _______________________________________________ Cvs-libraries mailing list [email protected] http://www.haskell.org/mailman/listinfo/cvs-libraries
