Yes, I’m thinking of regular parallelism. The “par”/“parseq” pattern looks like what I want, as far as I can see.
So the goal is to convert a Producer of blocks into a producer of lists of blocks, as you write, but I’m not sure I understand your second example. I'm looking for a pipe that takes in a list of Blocks and produces a spark using “seq” that creates a thread which starts converting this [Block] into a BlockState. This should continue until all cores are utilized converting [Block] -> BlockState. When a thread finishes it sends its BlockState down the pipe. The order of BlockStates is important, so if a thread finishes out-of-order, the result will have to be queued. I’m not sure if this is too much to ask, but it's what I have in mind. In your example, as far as I can see, the individual Block elements of the block list are just processed in parallel, which doesn't offer a speedup, since the folding of Blocks into a BlockState is sequential. I'm thinking something like this: blockListToBlockStatePar :: Pipe [Block] (Either String BlockState) IO () blockListToBlockStatePar = do bl <- await yield (processBlockList emptyBS bl) `par` blockListToBlockStatePar processBlockList :: BlockState -> [Block] -> Either String BlockState But I get an error I don't understand: Error:(106, 5) ghc: No instance for (Monad m0) arising from a use of ‘yield’ The type variable ‘m0’ is ambiguous Note: there are several potential instances: instance Monad ((->) r) -- Defined in ‘GHC.Base’ instance Monad IO -- Defined in ‘GHC.Base’ instance Monad [] -- Defined in ‘GHC.Base’ ...plus 25 others In the first argument of ‘par’, namely ‘yield (processBlockList emptyCS bl)’ /Rune > On 05 Nov 2015, at 14:22, Michael Thompson <practical.wis...@gmail.com> wrote: > > Are you thinking of regular pure parallelism, as with `parallel` or > `monad-par` or of something fancier like the work stealing example in the > pipes concurrency tutorial (which isn't itself appropriate here, I think, > since the order of events is important)? > > > > If you are thinking of pure parallelism here is a flat-footed approach. In > choosing a batch size you would be surveying the whole producer, so you can't > think inside the pipeline. You can first freeze each batch to a list or > something, say > > > batched :: Monad m => Int -> Producer a m x -> Producer [a] m x > > batched n p = L.purely folds L.list (view (chunksOf n) p) > > > > then resume piping with something like > > > > >>> :t \n f p -> batched n p >-> P.mapM (runParIO . parMap f) >-> > P.concat -- or P.map (runPar . parMap f) > > \n f p -> batched n p >-> P.mapM (runParIO . parMap f) >-> P.concat > > :: NFData c => > > > Int -> (a -> c) -> Producer a IO r -> Producer IO r > > > > The equivalent could be done with `async`. You'd have to think out whether > waiting to accumulate a batch and then processing simultaneously and > continuing would be an improvement on processing blocks as they come. > > > -- > You received this message because you are subscribed to a topic in the Google > Groups "Haskell Pipes" group. > To unsubscribe from this topic, visit > https://groups.google.com/d/topic/haskell-pipes/FItX8aZ588g/unsubscribe. > To unsubscribe from this group and all its topics, send an email to > haskell-pipes+unsubscr...@googlegroups.com. > To post to this group, send email to haskell-pipes@googlegroups.com. -- You received this message because you are subscribed to the Google Groups "Haskell Pipes" group. To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipes+unsubscr...@googlegroups.com. To post to this group, send email to haskell-pipes@googlegroups.com.