Let's decompose this into solving two smaller problems:

* How to process the group in blocks of 100
* How to parallelize each block

The type signature of the first step would be a function of type:

groupAndFold :: Monad m => Producer Block m r -> Producer BlockState m r

And this would use the `pipes-group` library to partition the stream into groups of 100 elements and then fold each group.

Notice that we haven't actually evaluated anything by doing that partition-and-fold step. Our final `Producer BlockState m r` is still a producer of unevaluated thunks. So that leads us to the second step, which is how to parallelize each block. The type of that would be something like:

    parallelize :: Monad m => Producer a m r -> Producer a m r

Now we have a smaller and clearer problem to solve: how do we take an arbitrary `Producer` that is yielding large unevaluated thunks and speculatively evaluate them ahead of time so that they are ready when you finally need them.

This is actually difficult to do, though, because you can't even *begin* to evaluate the Nth element that the `Producer` yield without triggering all side effects preceding that element in the `Producer`.

Let's take a simple example to illustrate the problem:

    example :: Producer Int IO ()
    example = do
        yield someExpensiveComputation1
        str1 <- lift getLine
        yield (someExpensiveComputationThatDependsOn str1)
        str2 <- lift getLine
        yield (anotherExpensiveComputationThatDependsOn str2)

The issue is that we can't even begin to evaluate the `anotherExpensiveComputationThatDependsOn str2` until we know the value of `str2`, but that requires forcing all effects leading up to the second `getLine` command. So the only way we can speculatively evaluate a `Producer` is to force the entire producer or at least force large chunks of the `Producer` at a time (i.e. force 20 `BlockState`s worth of computation at a time).

So let's refine the type of our `parallelize` function:

    parallelize :: Monad m => Int -> Producer a m r -> Producer a m r

The first argument will be how many elements to materialize and then speculatively compute at one time. This will materialize the `Producer` in chunks of the given size, spark off their evaluation and then re-yield them.

    import Control.Foldl (list, purely)
    import Control.Parallel (par)
    import Lens.Family.State.Strict (zoom)
    import Pipes (Producer, each, lift)
    import Pipes.Parse (foldAll, runStateT, splitAt)
    import Prelude hiding (splitAt)

    parallelize :: Monad m => Int -> Producer a m r -> Producer a m r
    parallelize n p = do
        let parser = zoom (splitAt 10) (purely foldAll list)
        (as, p') <- lift (runStateT parser p)
        as `par` (each as >> parallelize n p')

I haven't yet tested that the above code works; I only verified that it type-checks. However, that is probably close to the best you will be able to do using `pipes`.

On 11/7/2015 11:24 AM, Rune Kjær Svendsen wrote:
Yes. I see that I was less clear in my original message than I could have been. The slow/fast function isn't really relevant. Allow me to start over :)

I have a function:

processBlock :: BlockState -> Block -> BlockState

which folds a Block into a BlockState which has been accumulated from Blocks previous to the Block in question.

As such, this function only allows sequential operation on a list/stream of blocks.

I also have a function which combines two BlockStates into one:

consolidateBlockState :: BlockState -> BlockState -> BlockState

where the first BlockState is accumulated from Blocks prior to the Blocks from which the latter BlockState is made of.

This allows me to turn the otherwise sequential operation into a parallel one. I just can't figure out how to get "par" and "pseq" working inside a Pipe, in order to fold multiple sets of Blocks into BlockStates in parallel (on multiple CPU cores).


/Rune


On 07 Nov 2015, at 19:49, Michael Thompson <practical.wis...@gmail.com <mailto:practical.wis...@gmail.com>> wrote:


Right, I think I misunderstood the original message as saying that there was a slow function

     Block -> BlockState

and that one could fold the BlockStates up monoidally. But I guess the slow function is

    BlockState -> Block -> BlockState


--
You received this message because you are subscribed to a topic in the Google Groups "Haskell Pipes" group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/haskell-pipes/FItX8aZ588g/unsubscribe. To unsubscribe from this group and all its topics, send an email to haskell-pipes+unsubscr...@googlegroups.com <mailto:haskell-pipes+unsubscr...@googlegroups.com>. To post to this group, send email to haskell-pipes@googlegroups.com <mailto:haskell-pipes@googlegroups.com>.

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group. To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipes+unsubscr...@googlegroups.com <mailto:haskell-pipes+unsubscr...@googlegroups.com>. To post to this group, send email to haskell-pipes@googlegroups.com <mailto:haskell-pipes@googlegroups.com>.

--
You received this message because you are subscribed to the Google Groups "Haskell 
Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.

Reply via email to