Hi guys,
>> I'm not happy with asynchronous I/O in Haskell. It's hard to reason
>> about, and doesn't compose well.
>
> Async I/O *is* tricky if you're expecting threads to do their own
> writes/reads directly to/from sockets. I find that using a
> message-passing approach for communication makes this much easier.
yes, that is true. I've always felt that spreading IO code all over the
software is a choice that makes the programmers live unnecessarily hard.
The (IMHO superior) alternative is to have one central IO loop that
generates buffers of input, passes them to callback a function, and
receives buffers of output in response.
I have attached a short module that implements the following function:
type ByteCount = Word16
type Capacity = Word16
data Buffer = Buf !Capacity !(Ptr Word8) !ByteCount
type BlockHandler st = Buffer -> st -> IO (Buffer, st)
runLoop :: ReadHandle -> Capacity -> BlockHandler st -> st -> IO st
That setup is ideal for implementing streaming services, where there is
only one connection on which some kind of dialog between client/server
takes place, i.e. an HTTP server.
Programs like Bittorrent, on the other hand, are much harder to design,
because there's a great number of seemingly individual I/O contexts
(i.e. the machine is talking to hundreds, or even thousands of other
machines), but all those communications need to be coordinated in one
way or another.
A solution for that problem invariably ends up looking like a massive
finite state machine, which is somewhat unpleasant.
Take care,
Peter
{-# LANGUAGE DeriveDataTypeable #-}
{- |
Module : BlockIO
License : BSD3
Maintainer : [email protected]
Stability : provisional
Portability : DeriveDataTypeable
'runLoop' drives a 'BlockHandler' with data read from the
input stream until 'hIsEOF' ensues. Everything else has
to be done by the callback; runLoop just does the I\/O.
But it does it /fast/.
-}
module BlockIO where
import Prelude hiding ( catch, rem )
import Control.Exception
import Control.Monad.State
import Data.List
import Data.Typeable
import System.IO
import System.IO.Error hiding ( catch )
import Foreign hiding ( new )
import System.Timeout
-- * Static Buffer I\/O
type ReadHandle = Handle
type WriteHandle = Handle
type ByteCount = Word16
type Capacity = Word16
data Buffer = Buf !Capacity !(Ptr Word8) !ByteCount
deriving (Eq, Show, Typeable)
-- |Run the given computation with an initialized, empty
-- 'Buffer'. The buffer is gone when the computation
-- returns.
withBuffer :: Capacity -> (Buffer -> IO a) -> IO a
withBuffer 0 = fail "BlockIO.withBuffer with size 0 doesn't make sense"
withBuffer n = bracket cons dest
where
cons = mallocArray (fromIntegral n) >>= \p -> return (Buf n p 0)
dest (Buf _ p _) = free p
-- |Drop the first @n <= size@ octets from the buffer.
flush :: ByteCount -> Buffer -> IO Buffer
flush 0 buf = return buf
flush n (Buf cap ptr len) = assert (n <= len) $ do
let ptr' = ptr `plusPtr` fromIntegral n
len' = fromIntegral len - fromIntegral n
when (len' > 0) (copyArray ptr ptr' len')
return (Buf cap ptr (fromIntegral len'))
type Timeout = Int
-- |If there is space, read and append more octets; then
-- return the modified buffer. In case of 'hIsEOF',
-- 'Nothing' is returned. If the buffer is full already,
-- 'throwDyn' a 'BufferOverflow' exception. When the timeout
-- exceeds, 'ReadTimeout' is thrown.
slurp :: Timeout -> ReadHandle -> Buffer -> IO (Maybe Buffer)
slurp to h b@(Buf cap ptr len) = do
when (cap <= len) (throw (BufferOverflow h b))
timeout to (handleEOF wrap) >>=
maybe (throw (ReadTimeout to h b)) return
where
wrap = do let ptr' = ptr `plusPtr` fromIntegral len
n = cap - len
rc <- hGetBufNonBlocking h ptr' (fromIntegral n)
if rc > 0
then return (Buf cap ptr (len + fromIntegral rc))
else hWaitForInput h (-1) >> wrap
-- * BlockHandler and I\/O Driver
-- |A callback function suitable for use with 'runLoop'
-- takes a buffer and a state, then returns a modified
-- buffer and a modified state. Usually the callback will
-- use 'slurp' to remove data it has processed already.
type BlockHandler st = Buffer -> st -> IO (Buffer, st)
type ExceptionHandler st e = e -> st -> IO st
-- |Our main I\/O driver.
runLoopNB
:: (st -> Timeout) -- ^ user state provides timeout
-> (SomeException -> st -> IO st) -- ^ user provides I\/O error handler
-> ReadHandle -- ^ the input source
-> Capacity -- ^ I\/O buffer size
-> BlockHandler st -- ^ callback
-> st -- ^ initial callback state
-> IO st -- ^ return final callback state
runLoopNB mkTO errH hIn cap f initST = withBuffer cap (`ioloop` initST)
where
ioloop buf st = buf `seq` st `seq`
handle (`errH` st) $ do
rc <- slurp (mkTO st) hIn buf
case rc of
Nothing -> return st
Just buf' -> f buf' st >>= uncurry ioloop
-- |A variant which won't time out and will just 'throw' all
-- exceptions.
runLoop :: ReadHandle -> Capacity -> BlockHandler st -> st -> IO st
runLoop = runLoopNB (const (-1)) (\e _ -> throw e)
-- * Handler Combinators
-- |Signal how many bytes have been consumed from the
-- /front/ of the list; these octets will be dropped.
type StreamHandler st = [Word8] -> st -> IO (ByteCount, st)
handleStream :: StreamHandler st -> BlockHandler st
handleStream f buf@(Buf _ ptr len) st = do
(i, st') <- peekArray (fromIntegral len) ptr >>= flip f st
buf' <- flush i buf
return (buf', st')
-- * I\/O Exceptions
-- |Thrown by 'slurp'.
data BufferOverflow = BufferOverflow ReadHandle Buffer
deriving (Show, Typeable)
instance Exception BufferOverflow where
-- |Thrown by 'slurp'.
data ReadTimeout = ReadTimeout Timeout ReadHandle Buffer
deriving (Show, Typeable)
instance Exception ReadTimeout where
-- * Internal Helper Functions
-- |Return 'Nothing' if the given computation throws an
-- 'isEOFError' exception. Used by 'slurp'.
handleEOF :: IO a -> IO (Maybe a)
handleEOF f =
catchJust fromException
(fmap Just f)
(\e -> if isEOFError e then return Nothing else ioError e)
-- |Our version of C's @strstr(3)@.
strstr :: [Word8] -> [Word8] -> Maybe Int
strstr tok = strstr' 0
where
strstr' _ [] = Nothing
strstr' pos ls@(_:xs)
| tok `isPrefixOf` ls = Just (pos + length tok)
| otherwise = strstr' (pos + 1) xs
-- |Split a list by some delimiter. Will soon be provided by
-- "Data.List".
splitList :: Eq a => [a] -> [a] -> [[a]]
splitList d' l' =
unfoldr (\x -> if null x then Nothing else Just $ nextToken d' [] (snd $
splitAt (length d') x)) (d'++l')
where nextToken _ r [] = (r, [])
nextToken d r l@(h:t) | d `isPrefixOf` l = (r, l)
| otherwise = nextToken d (r++[h]) t
_______________________________________________
Haskell-Cafe mailing list
[email protected]
http://www.haskell.org/mailman/listinfo/haskell-cafe