Let's decompose this into solving two smaller problems:
* How to process the group in blocks of 100
* How to parallelize each block
The type signature of the first step would be a function of type:
groupAndFold :: Monad m => Producer Block m r -> Producer
BlockState m r
And this would use the `pipes-group` library to partition the stream
into groups of 100 elements and then fold each group.
Notice that we haven't actually evaluated anything by doing that
partition-and-fold step. Our final `Producer BlockState m r` is still a
producer of unevaluated thunks. So that leads us to the second step,
which is how to parallelize each block. The type of that would be
something like:
parallelize :: Monad m => Producer a m r -> Producer a m r
Now we have a smaller and clearer problem to solve: how do we take an
arbitrary `Producer` that is yielding large unevaluated thunks and
speculatively evaluate them ahead of time so that they are ready when
you finally need them.
This is actually difficult to do, though, because you can't even *begin*
to evaluate the Nth element that the `Producer` yield without triggering
all side effects preceding that element in the `Producer`.
Let's take a simple example to illustrate the problem:
example :: Producer Int IO ()
example = do
yield someExpensiveComputation1
str1 <- lift getLine
yield (someExpensiveComputationThatDependsOn str1)
str2 <- lift getLine
yield (anotherExpensiveComputationThatDependsOn str2)
The issue is that we can't even begin to evaluate the
`anotherExpensiveComputationThatDependsOn str2` until we know the value
of `str2`, but that requires forcing all effects leading up to the
second `getLine` command. So the only way we can speculatively evaluate
a `Producer` is to force the entire producer or at least force large
chunks of the `Producer` at a time (i.e. force 20 `BlockState`s worth of
computation at a time).
So let's refine the type of our `parallelize` function:
parallelize :: Monad m => Int -> Producer a m r -> Producer a m r
The first argument will be how many elements to materialize and then
speculatively compute at one time. This will materialize the `Producer`
in chunks of the given size, spark off their evaluation and then
re-yield them.
import Control.Foldl (list, purely)
import Control.Parallel (par)
import Lens.Family.State.Strict (zoom)
import Pipes (Producer, each, lift)
import Pipes.Parse (foldAll, runStateT, splitAt)
import Prelude hiding (splitAt)
parallelize :: Monad m => Int -> Producer a m r -> Producer a m r
parallelize n p = do
let parser = zoom (splitAt 10) (purely foldAll list)
(as, p') <- lift (runStateT parser p)
as `par` (each as >> parallelize n p')
I haven't yet tested that the above code works; I only verified that it
type-checks. However, that is probably close to the best you will be
able to do using `pipes`.
On 11/7/2015 11:24 AM, Rune Kjær Svendsen wrote:
Yes. I see that I was less clear in my original message than I could
have been. The slow/fast function isn't really relevant. Allow me to
start over :)
I have a function:
processBlock :: BlockState -> Block -> BlockState
which folds a Block into a BlockState which has been accumulated from
Blocks previous to the Block in question.
As such, this function only allows sequential operation on a
list/stream of blocks.
I also have a function which combines two BlockStates into one:
consolidateBlockState :: BlockState -> BlockState -> BlockState
where the first BlockState is accumulated from Blocks prior to the
Blocks from which the latter BlockState is made of.
This allows me to turn the otherwise sequential operation into a
parallel one. I just can't figure out how to get "par" and "pseq"
working inside a Pipe, in order to fold multiple sets of Blocks into
BlockStates in parallel (on multiple CPU cores).
/Rune
On 07 Nov 2015, at 19:49, Michael Thompson
<practical.wis...@gmail.com <mailto:practical.wis...@gmail.com>> wrote:
Right, I think I misunderstood the original message as saying that
there was a slow function
Block -> BlockState
and that one could fold the BlockStates up monoidally. But I guess
the slow function is
BlockState -> Block -> BlockState
--
You received this message because you are subscribed to a topic in
the Google Groups "Haskell Pipes" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/haskell-pipes/FItX8aZ588g/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
haskell-pipes+unsubscr...@googlegroups.com
<mailto:haskell-pipes+unsubscr...@googlegroups.com>.
To post to this group, send email to haskell-pipes@googlegroups.com
<mailto:haskell-pipes@googlegroups.com>.
--
You received this message because you are subscribed to the Google
Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to haskell-pipes+unsubscr...@googlegroups.com
<mailto:haskell-pipes+unsubscr...@googlegroups.com>.
To post to this group, send email to haskell-pipes@googlegroups.com
<mailto:haskell-pipes@googlegroups.com>.
--
You received this message because you are subscribed to the Google Groups "Haskell
Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.