The general rule of thumb is that you should only use concurrency for
two reasons:
* Increased performance (and only if the increase offsets the context
switching)
* Waiting on multiple concurrent input streams
Generally you should try to avoid using concurrency just as a control
flow mechanism and instead you should try to use pure, single-threaded
ways of forking input streams. The main reason you want to avoid
concurrency unless absolutely necessary is that it's very difficult to
test and reason about concurrent code.
The answer to this question depends on how `groupSmart` and `groupDamn`
are implemented. However, I can give one example if I can make certain
assumptions.
Let's assume for simplicity that `groupSmart`/`groupDamn` emit one
output element for every input element. In that case you can actually
encode them as `Fold`s from my `foldl` library:
groupSmart :: Fold I O1
groupDamn :: Fold I O2
Then if you wanted to run both grouping mechanisms in parallel over the
same stream, you would just combine them using `Applicative` syntax:
groupBoth :: Fold I (O1, O2)
groupBoth = (,) <$> groupSmart <*> groupDamn
Then you would transform that into a `Pipe` by using:
import Control.Foldl (purely)
import qualified Pipes.Prelude as Pipes
pipeBoth :: Monad m => Pipe I (O1, O2) m r
pipeBoth = purely Pipes.scan groupBoth
Then you would just write something like:
example :: MonadIO io => Consumer I io r
example = for (purely Pipes.scan groupBoth) (\(o1, o2) -> do
liftIO (writeToHandle handle1 o1)
liftIO (writeToHandle handle2 o2))
... and now you can do everything within a single pipeline.
You can also give the `groupSmart`/`groupDamn` folds access to the
`StateT` layer by generalizing them to `FoldM`s instead:
groupSmart :: FoldM (StateT References IO) I O1
groupDamn :: FoldM (StateT References IO) I O1
... and the only change you would make is to use `impurely Pipes.scanM`
instead of `purely Pipes.scan`.
On 8/23/2015 5:31 AM, Alexey Raga wrote:
Hi,
I am using "pipes-concurrency" trying to model the following scenario:
my "source" pipe has type of
stream :: Producer InputData (StateT References IO) ()
where "References" is just a map that I accumulate while streaming
the source. Then I am following the "broadcast" example from the
Tutorial attempting to "fork" my flow into two branches:
main = do
(output1, input1) <- spawn unbounded
(output2, input2) <- spawn unbounded
a1 <- async $ do
execStateT (runEffect $ stream >-> toOutput (output1 <> output2))
emptyTables
performGC
a2 <- async $ do
withFile "/tmp/geo/smart-grouping.xml" WriteMode $ \h -> do
evalStateT (runEffect $ groupSmart (fromInput input1) >->
toHandle h) ??? -- what to put here?
hFlush h
performGC
a3 <- async $ do
withFile "/tmp/geo/damnGrouping.xml" WriteMode $ \h -> do
evalStateT (runEffect $ groupDamn (fromInput input2) >->
toHandle h) ???
hFlush h
performGC
mapM_ wait [a1,a2,a3]
I would like my branches to use the information from the source's
StateT, however following the types, it looks like I have to provide
each branch with the initial (empty?) state.
How do I share the state between the source and the forked branches?
Or how do I model this situation correctly?
Cheers,
Alexey.
--
You received this message because you are subscribed to the Google
Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to haskell-pipes+unsubscr...@googlegroups.com
<mailto:haskell-pipes+unsubscr...@googlegroups.com>.
To post to this group, send email to haskell-pipes@googlegroups.com
<mailto:haskell-pipes@googlegroups.com>.
--
You received this message because you are subscribed to the Google Groups "Haskell
Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.