How about this for a possible implementation using pipes-concurrency:
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE RankNTypes #-}
import qualified Control.Foldl as Foldl
import Control.Concurrent.Async
import Control.Exception
import Pipes
import qualified Pipes.Prelude as Pipes
import Pipes.Concurrent
newtype Fold1 b a = Fold1 { runFold1 :: forall r. Producer b IO r -> IO
(a,r) } deriving (Functor)
withFold :: Foldl.Fold b a -> Fold1 b a
withFold aFold = Fold1 (adapt (Foldl.generalize aFold))
where
adapt f = \producer -> Foldl.impurely Pipes.foldM' f producer
appFold1 :: Int -> Producer b IO r -> Fold1 b a1 -> Fold1 b a2 -> IO
((a1,a2),r)
appFold1 bufsize producer (Fold1 fs) (Fold1 as) = do
(outbox1,inbox1,seal1) <- spawn' (bounded bufsize)
(outbox2,inbox2,seal2) <- spawn' (bounded bufsize)
runConcurrently $
(\(a1,()) (a2,()) r -> ((a1,a2),r))
<$>
Concurrently (fs (fromInput inbox1) `finally` atomically seal1)
<*>
Concurrently (as (fromInput inbox2) `finally` atomically seal2)
<*>
(Concurrently $
(runEffect (producer >-> Pipes.tee (toOutput outbox1 *>
Pipes.drain)
>->
(toOutput outbox2 *> Pipes.drain)))
`finally` atomically seal1
`finally` atomically seal2)
I haven't tested it, however.
On Tuesday, March 29, 2016 at 1:13:35 PM UTC+2, Kostiantyn Rybnikov wrote:
>
> I have a pipe which consumes data from storage, folding it with foldl
> package. It's all great and wonderful. But now, since I have multiple tasks
> which stream the same data basically, but fold them with different folds
> into different report types, I want to gather them into a single piped
> stream, which gets folded into multiple reports at once. On one hand I'd
> like to have a guarantee that data gets stored in a constant memory, on the
> other hand, I'd like reports to be computed in different threads.
>
> Which are the best practices to achieve this? Is there a way to make a
> "tee" on a pipe, but then make sure it somehow gets blocked when consumer
> of one channel's copy gets "too fast", e.g. when slower report is 2 times
> slower and faster report consumes 2000 items while slower one only consumed
> 1000, I'd like to block faster report to wait until slower one catches up
> to not have more than 1000 elems in memory.
>
> Any ideas or experiences are welcome. Thank you!
>
--
You received this message because you are subscribed to the Google Groups
"Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].