Thanks, Daniel, that was a really helpful suggestion! In case someone
else is ever looking for an example of using ExceptT and Control.Foldl
to validate/process data from a Producer, stopping immediately if any
of the operations experience an error, here's a small program
demonstrating how to do it:
http://pastebin.com/HgXQRm3p
This example produces a list of pairs, and then folds over them to
validate some ordering properties of the pairs, and also prints the
pairs out as they're produced. Running the example shows that if any
of the validators experience an error, the computation is terminated
and we get the error back immediately.
Thanks again,
Dylan
On Friday, October 2, 2015 at 10:31:18 AM UTC-4, Daniel Díaz wrote:
> What's unclear to me is how to make all of the combined `Fold`s
stop if any one of them encounter an error state.
You could try using a monadic fold that works in the ExceptT
<http://hackage.haskell.org/package/transformers-0.4.3.0/docs/Control-Monad-Trans-Except.html>
monad transformer. Such folds can "stop early".
On Friday, October 2, 2015 at 3:47:00 PM UTC+2, Dylan Tisdall wrote:
Maybe I should ask a more general question, in case I'm going
about this all wrong with my previous detailed question. I'm
processing a file that consists of millions of "records". The
records (`MDHAndScanLine`) were generated by a process with a
series of nested loops, so each record contains data, and the
loop counters when the data was generated. Having parsed these
records into a producer of type `P.ProducerMDHAndScanLine m
(EitherString())`, I've basically flattened out all the loops
into a single list. My problem is now that I've got multiple
operations I'd like to do in one pass down the `Producer`
(e.g., scan through it and make sure that the loop counters
are all in the order I expect; do processing of type A to the
data and output the result; do processing of type B on only
data with certain loop counters and output the result).
Ideally, if any of these operations encounter a failure, I'd
like to report the error and bail on the whole thing.
I was originally thinking of making each one of these
operations a Pipe, and having them stop `yield`-ing and return
errors immediately if something goes wrong; that's what lead
to my questions about FreeT. However, based on the ongoing
thread "Help folding a Producer into a value", I'm getting the
sense that Control.Foldl might be the better tool for my
problem. I can see that if I write each of my operations at a
`Fold` (or `FoldM` for logging output along the way), then I
can combine them all into one joint operation that will
traverse the `Producer`. I can see that you can use the fold's
state to keep track of whether you've encountered an error,
and then the fold's step function can just stop updating on
new data. What's unclear to me is how to make all of the
combined `Fold`s stop if any one of them encounter an error state.
Also, the nice part of the Pipe-style solution I described
first is that I don't think it'll keep reading the file once
it encounters an error (it will stop `await`-ing one it
returns an error). My impression is that the Fold approach
will insist on traversing the whole file every time, and just
ignore all the data once its had an error. Or perhaps I've
totally missed something about Fold that allows it to stop
early if it has its result.
Sorry for the confusion! These tools all seem really helpful,
but I'm still trying to figure out how to put them together
the right way.
Thanks again,
Dylan
On Thursday, October 1, 2015 at 5:11:12 PM UTC-4, Dylan
Tisdall wrote:
Thanks Michael and Gabriel for your help; despite
appearances here, I am actually making headway on my project!
I've run into a new point of confusion, though, related to
how to allow failure in parsing over groups. More explicitly:
My data naturally breaks into groups, so I've got a method
|
groupByID ::Monadm =>P.ProducerMDHAndScanLinem
(EitherString())->
P.FreeT(P.ProducerMDHAndScanLinem)m (Either String ())
|
That takes my Producer of MDHAndScanLines and breaks them
using the relevant comparison. My question is now how to
parse this new FreeT structure when it's possible that the
parsing might fail. For example, I want to process each
group, making them into a new data type `GroupedMDHs`, but
it's possible I'll discover as I'm consuming the data that
my preconditions for making a `GroupedMDHs` are violated,
at which point I want to fail. So I want a method like:
|
example:::Monadm =>P.FreeT(P.ProducerMDHAndScanLinem)m
(EitherString())->
P.ProducerGroupedMDHsm (EitherString())
|
where I return `Left "Error"` if either the underlying
Producer returns a `Left`, or if I discover I can't make a
GroupedMDH.
To make my example more concrete, say I have a function:
|
groupedMDHParser ::Monadm =>Int->P.ParserMDHAndScanLinem
(Either(EitherString())GroupedMDHs)
|
This parser consumes all the input from a producer, and
makes a `GroupedMDHs`. However, if the length of data
consumed doesn't match the first `Int` argument, then it
returns `Left $ Left "Error"` and otherwise it returns a
`GroupedMDHs`. I chose the type signature to be compatible
with `parseForever`, since that would turn this parser
into a pipe that matched my original producer type
`P.ProducerMDHAndScanLine m (EitherString())`. I've tried
to find a way to apply this pipe to each element of the
FreeT "list" in turn, but the return types don't match up.
I understand at a conceptual level that I can't just map
my transformation over each element of the FreeT, because
if the application of `groupedMDHParser`fails, its
implicitly also truncating the FreeT structure at that
point and returning early. So this doesn't work:
|
|
example groupedMDHScanLines =P.concats $ P.maps (\p ->p
>->groupedMDHParser)groupedMDHScanLines
|
|
I can also see that Pipes.Groups.folds is a candidate, but
I don't see how I can get it to exit early if processing
one of the groups fails. Is there a standard idiom for
that kind of behaviour?
Thanks again,
Dylan
On Sunday, September 27, 2015 at 12:05:52 PM UTC-4,
Gabriel Gonzalez wrote:
Like Michael mentioned, you want `Pipes.Prelude.fold'`
If you know that your `FreeT` list only has one
`Producer`, then you should encode that in the type by
keeping it as a `Producer`. Then the question becomes
how to fold that `Producer` directly instead of
folding it within the context of a `FreeT` list, and
that's what the `fold'` function does: it folds the
producer and also preserves the return value:
fold' :: Monad m => (x -> a -> x) -> x -> (x -> b)
-> Producer a m r -> m (b, r)
... or combined with the `foldl` library it would be:
purely fold' :: Monad m => Fold a b -> Producer a
m r -> m (b, r)
On 9/24/2015 8:47 PM, Dylan Tisdall wrote:
I have a quick follow-up question, actually;
pipes-group defines:
Pipes.Group.folds
:: Monad m
=> (x -> a -> x)
-- ^ Step function
-> x
-- ^ Initial accumulator
-> (x -> b)
-- ^ Extraction function
-> FreeT (Producer a m) m r
-- ^
-> Producer b m r
If I'm reading this right, when my FreeT "list"
consists of just one Producer, then
Pipes.Groups.folds returns a Producer that yields
one output, and preserves the original Producer's
return type, r, in the returned Producer. This is
in contrast to the similar function
Pipes.Prelude.fold :: Monad m => (x -> a -> x) ->
x -> (x -> b) -> Producer a m () -> m b
which only works on Producers with return type ().
You note in the documentation for
Pipes.Prelude.fold that this type is required
because it may stop drawing from the Producer
early, so you don't necessarily get to compute the
return type. I'm wondering if it's easy to define
a function
foldToProducer :: Monad m => (x -> a -> x) -> x ->
(x -> b) -> Producer a m r -> Producer b m r
that does what I think Pipes.Group.folds is doing,
but without needing all the FreeT bits as well. As
an exercise, I tried to write foldToProducer, but
couldn't figure it out.
Thanks again,
Dylan
On Thursday, September 24, 2015 at 11:20:06 PM
UTC-4, Dylan Tisdall wrote:
Right, I wasn't recognizing that `Producer`
was an instance of `Functor` since it's an
instance of `Monad`, so I wasn't even looking
there. Thanks again for all your help!
On Tuesday, September 22, 2015 at 6:56:49 PM
UTC-4, Gabriel Gonzalez wrote:
Use the `void` function from
`Control.Monad` if you want to erase the
return type of a `Producer`:
void :: Functor f => f a -> f ()
void = fmap (\_ -> ())
I might even re-export this from `pipes`
as a convenience since this question comes
up a lot.
Originally functions like
`Pipes.Prelude.length` had a more general
type like this:
Pipes.Prelude.length :: Producer a m r ->
m Int
... but then at the advice of others I
restricted the type to this:
Pipes.Prelude.length :: Producer a m () ->
m Int
... so that the user would have to
explicitly discard the return value to
signal that they were okay with ignoring
that data. This is similar in principle
to the warning you get if you turn on the
`-Wall` flag that (among other things)
warns if you have an unused non-empty
return value, like this:
example = do
getLine // Compiler warning because you
didn't use the result
...
... and you usually have to explicitly
ignore the value using something like this
syntax to indicate that you are
intentionally ignoring the value:
example = do
_ <- getLine
...
So the requirement to explicitly discard
the value using `void` is in the same
spirit as that compiler warning.
On 9/22/15 3:50 PM, Dylan Tisdall wrote:
Hi Gabriel,
Thanks again for your help. That
really clarified that I should be
using lift to keep everything inside
the Producer transfomer. To make all
the types work, I ended up with:
type MDHAndScanLineProducer =
P.Producer MDHAndScanLine IO (Either
(P.DecodingError, P.Producer
P.ByteString IO ()) ())
measDatMDHScanLinePairs :: Handle ->
MDHAndScanLineProducer
measDatMDHScanLinePairs h = do
(hLen, leftovers) <- lift $
P.runStateT (P.decodeGet getWord32le) p
case (hLen :: Either P.DecodingError
Word32) of
Left err -> return $ Left (err,
leftovers)
Right len -> do
lift (hSeek h AbsoluteSeek
(fromIntegral len))
view P.decoded p
where
p = PB.fromHandle h
This seems to work exactly as I'd hoped.
As a follow-up, I'm now wondering how
to use this producer and ignore its
return type; effectively how to turn
it into a Producer MDHAndScanLine IO
(). This seems to be necessary to
access many library functions. For
example, I can't use
Pipes.Prelude.length :: Monadm =>
Producer a m () -> m Int
directly on the output of
measDatMDHScanLinePairs because the
return type doesn't match.
Thanks again for all your help as I
get up to speed on this!
Dylan
On Monday, September 21, 2015 at
11:43:58 PM UTC-4, Gabriel Gonzalez
wrote:
You're definitely on the right
track. The type I would aim for
would be something like this:
example :: Handle -> Producer
MDHAndScanLine IO (Either
DecodingError (Producer ByteString
IO ()))
Notice that this slightly differs
from your type; I'm merging the
outer `IO (Either DecodingError
...)` into the first `Producer` to
simplify the type.
The implementation for that type
would be very similar to the one
you wrote in your second e-mail:
example :: Handle -> Producer
MDHAndScanLine IO (Either
DecodingError (Producer ByteString
IO ()))
example handle = do
let p =
Pipes.ByteString.fromHandle handle
x <- lift (evalStateT
(decodeGet getWord32le) p)
case x of
Left err -> return (Left err)
Right len -> do
lift (hSeek handle AbsoluteSeek
(fromIntegral l))
view decoded p
That will definitely run in
constant memory, meaning that it
won't ever load more than one
chunk of bytes at a time (where a
chunk is something like 32 kB, I
think). You can profile the heap
if you want to verify this by
following these instructions:
https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/prof-heap.html
<https://downloads.haskell.org/%7Eghc/latest/docs/html/users_guide/prof-heap.html>
Also, to answer your other
question, `pipes-attoparsec` runs
in constant memory. The
difference between
`pipes-attoparsec` and
`attoparsec` is that
`pipes-attoparsec` runs a separate
parser for each element in the
stream, which is equivalent to
"committing" after each parsed
element. That means that it can
only backtrack while parsing a
single element in the stream, but
no further back. This is why
`pipes-attoparsec` runs in
constant space over a large file
and why `attoparsec` does not,
because `attoparsec` backtracks
indefinitely and
`pipes-attoparsec` does not.
On 9/21/15 12:10 PM, Dylan Tisdall
wrote:
Following up on my last
question, my next issue is
also probably a very straight
ahead example of pipes, but
I've managed to get tangled up
going back and forth in the
packages' documentation.
I've got a file whose first 4
bytes give the offset into the
file of a series of binary
data elements (called MDHs in
my case). Given a Handle to
the start of such a file, I
want to:
1. read the first Word32 in
the file, to retrieve the offset;
2. skip the Handle to that
offset; and
3. turn the rest of the file
into a Producer MDH IO ()
Given that the file I'm
reading may be large, I want
to make sure this process is
going to run in constant
memory. I thought I could use
pipes-attoparsec, but I
couldn't get straight whether
it would need to read the
whole file before it could
produce anything (as I
understand is normally the
case with attoparsec).
So far I have the following,
which isn't complete, but at
least does the skip and
converts the remaining file to
a ByteString producer.
|
handleToMDHs ::Handle->IO
(EitherP.DecodingError(P.ProducerP.ByteStringIO
()))
handleToMDHs h =do
hLen <-P.evalStateT
(P.decodeGet
getWord32le)(PB.fromHandle h)
case(hLen
::EitherP.DecodingErrorWord32)of
Lefterr ->return$ Lefterr
Rightlen ->
|
...
--
You received this message because you are subscribed to the Google
Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to haskell-pipes+unsubscr...@googlegroups.com
<mailto:haskell-pipes+unsubscr...@googlegroups.com>.
To post to this group, send email to haskell-pipes@googlegroups.com
<mailto:haskell-pipes@googlegroups.com>.