On 12-02-13 10:12 AM, Roel van Dijk wrote:
Hello,

I have a program which I believe can benefit from concurrency. But I
am wondering if the mechanisms I need already exist somewhere on
Hackage.

You can try monad-coroutine. Here is an incomplete transcription of your code:

import Control.Monad
import Control.Monad.Coroutine
import Control.Monad.Coroutine.SuspensionFunctors
import Control.Monad.Trans.Class

type Producer a = Coroutine (Yield a) IO ()
type Converter a b = Coroutine (EitherFunctor (Await a) (Yield b)) IO ()
type Consumer b = Coroutine (Await b) IO ()

producer :: Producer Int
producer = forM_ [1..10] yield

converter :: Converter Int Int
converter =  forever $ do a <- mapSuspension LeftF await
                          mapSuspension RightF $ yield (10 * a)

consumer :: Consumer Int
consumer = forever (await >>= lift . print)

main = seesaw parallelBinder awaitYieldResolver consumer producer

The main function is incorrect because it's not using the converter. I didn't add that because there's no ready function for invoking it in the package so it wouldn't be a one-liner, and also because I'm not sure if it should be a monadic coroutine of its own or a pure function.

The producer and consumer coroutines' steps should be running in parallel. I say "should" because I never got any decent parallel speedup with this scheme, but I'm not sure if the problem is in the library code or in the earlier versions of GHC.

One thing that's missing here is a way for the producer to run ahead of the consumer and produce more results that would be buffered. At the moment the producer blocks until the consumer is ready. I don't have a ready solution for this, but I'll give it some thought.



Here is a sketch of my program, in literate Haskell:

module Problem where
import Control.Monad ( forM_ )
The producer produces values. It blocks until there are now more
values to produce. Each value is given to a callback function.

type Producer a = (a ->  IO ()) ->  IO ()
The converter does some work with a value. This work is purely CPU and
it is the bottleneck of the program. The amount of work it has to do
is variable.

type Converter a b = a ->  b
The consumer does something with the value calculated by the
converter. It is very important that the consumer consumes the values
in the same order as they are produced.

type Consumer b = b ->  IO ()
Dummy producer, converter and consumer:

producer :: Producer Int
producer callback = forM_ [1..10] callback
converter :: Converter Int Int
converter = (*10)
consumer :: Consumer Int
consumer = print
A simple driver. Does not exploit concurrency.

simpleDriver :: Producer a ->  Converter a b ->  Consumer b ->  IO ()
simpleDriver producer converter consumer = producer (consumer . converter)
current_situation :: IO ()
current_situation = simpleDriver producer converter consumer
Ideally I would like a driver that spawns a worker thread for each
core in my system. But the trick is in ensuring that the consumer is
offered results in the same order as they are generated by the
producer.

I can envision that some kind of storage is necessary to keep track of
results which can not yet be offered to the consumer because it is
still waiting for an earlier result.

Is there any package on Haskell that can help me with this problem? Or
do I have to implement it using lower level concurrency primitives?

Regards,
Roel

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe


_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe

Reply via email to