Michael,

On Tue, 2012-11-27 at 17:14 +0200, Michael Snoyman wrote:
> I think the stm-conduit package[1] may be helpful for this use case.
> Each time you get a new command, you can fork a thread and give it the
> TBMChan to write to, and you can use sourceTBMChan to get a source to
> send to the client.

That's +- what I had in mind. I did find stm-conduit before and did try
to get the thing working using it, but these attempts failed.

I attached an example which might clarify what I intend to do. I'm aware
it contains several potential bugs (leaking threads etc), but that's
beside the question ;-)

If only I could figure out what to put on the 3 lines of comment I left
in there...

Thanks for your help,

Nicolas

{-# LANGUAGE Rank2Types #-}

module Main where

import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.TMChan

import Control.Applicative
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO, liftIO)

data Command = Add Int Int
             | Disconnect
  deriving (Show)
data Reply = Result Int
  deriving (Show)

application :: MonadIO m => GConduit Int m String
application = do
    -- Create input and output channels to/from worker threads
    (chanIn, chanOut) <- liftIO $ (,) <$> newTBMChanIO 10 <*> newTBMChanIO 10

    -- Spawn some worker threads
    liftIO $ forM_ [0..5] $ \i -> forkIO $ processCommands i chanIn chanOut

    -- How to make
    -- sourceTBMChan chanOut
    -- something of which all produced values are yield'ed by this Conduit?

    loop chanIn
  where
    -- Loop retrieves one command from our source and pushes it to the
    -- worker threads input channel, then loops
    loop :: MonadIO m => TBMChan Command -> GConduit Int m String
    loop chan = do
        liftIO $ putStrLn "Enter loop"
        cmd <- getCommand
        liftIO $ do
            putStrLn $ "Got command: " ++ show cmd
            atomically $ writeTBMChan chan cmd
        case cmd of
            Disconnect -> return ()
            _ -> loop chan

    -- getCommand fetches and parses a single command from our source
    getCommand :: Monad m => GSink Int m Command
    getCommand = do
        v <- await
        case v of
            Nothing -> return Disconnect
            Just i -> return $ Add i 1

-- processCommands reads commands from a given input channel, processes
-- them, and pushes the result to a given output channel
processCommands :: Int -> TBMChan Command -> TBMChan Reply -> IO ()
processCommands i chanIn chanOut = do
    putStrLn $ "Enter processCommands " ++ show i
    cmd <- atomically $ readTBMChan chanIn
    putStrLn $ show i ++ " read command: " ++ show cmd
    case cmd of
        Nothing -> return ()
        Just (Add a b) -> do
            atomically $ writeTBMChan chanOut (Result (a + b))
            putStrLn $ show i ++ " pushed result"
            processCommands i chanIn chanOut
        Just Disconnect -> return ()


main :: IO ()
main = do
    res <- CL.sourceList [1..20] $= application $$ CL.consume
    putStrLn $ "Result: " ++ show res
_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe

Reply via email to