How about this for a possible implementation using pipes-concurrency:

    {-# LANGUAGE DeriveFunctor #-}
    {-# LANGUAGE RankNTypes #-}

    import qualified Control.Foldl as Foldl
    import Control.Concurrent.Async
    import Control.Exception
    import Pipes 
    import qualified Pipes.Prelude as Pipes
    import Pipes.Concurrent

    newtype Fold1 b a = Fold1 { runFold1 :: forall r. Producer b IO r -> IO 
(a,r) } deriving (Functor)

    withFold :: Foldl.Fold b a -> Fold1 b a 
    withFold aFold = Fold1 (adapt (Foldl.generalize aFold))
        where
        adapt f = \producer -> Foldl.impurely Pipes.foldM' f producer

    appFold1 :: Int -> Producer b IO r -> Fold1 b a1 -> Fold1 b a2 -> IO 
((a1,a2),r)
    appFold1 bufsize producer (Fold1 fs) (Fold1 as) = do
        (outbox1,inbox1,seal1) <- spawn' (bounded bufsize)
        (outbox2,inbox2,seal2) <- spawn' (bounded bufsize)
        runConcurrently $
            (\(a1,()) (a2,()) r -> ((a1,a2),r))
            <$>
            Concurrently (fs (fromInput inbox1) `finally` atomically seal1)
            <*>
            Concurrently (as (fromInput inbox2) `finally` atomically seal2)
            <*>
            (Concurrently $
                (runEffect (producer >-> Pipes.tee (toOutput outbox1 *> 
Pipes.drain) 
                                                 >->                 
(toOutput outbox2 *> Pipes.drain)))
                `finally` atomically seal1 
                `finally` atomically seal2)

I haven't tested it, however.

On Tuesday, March 29, 2016 at 1:13:35 PM UTC+2, Kostiantyn Rybnikov wrote:
>
> I have a pipe which consumes data from storage, folding it with foldl 
> package. It's all great and wonderful. But now, since I have multiple tasks 
> which stream the same data basically, but fold them with different folds 
> into different report types, I want to gather them into a single piped 
> stream, which gets folded into multiple reports at once. On one hand I'd 
> like to have a guarantee that data gets stored in a constant memory, on the 
> other hand, I'd like reports to be computed in different threads.
>
> Which are the best practices to achieve this? Is there a way to make a 
> "tee" on a pipe, but then make sure it somehow gets blocked when consumer 
> of one channel's copy gets "too fast", e.g. when slower report is 2 times 
> slower and faster report consumes 2000 items while slower one only consumed 
> 1000, I'd like to block faster report to wait until slower one catches up 
> to not have more than 1000 elems in memory.
>
> Any ideas or experiences are welcome. Thank you!
>

-- 
You received this message because you are subscribed to the Google Groups 
"Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].

Reply via email to