Thank you!  There is a lot to note regarding how pipes might work with
lenses.  Have a great day.  - E

On Sun, Feb 7, 2016 at 10:21 PM, Gabriel Gonzalez <gabriel...@gmail.com>
wrote:

> Here you go.  This program outputs a cumulative average in the last column:
>
>     {-# LANGUAGE OverloadedStrings #-}
>     {-# LANGUAGE TemplateHaskell   #-}
>
>     import Control.Applicative
>     import Control.Exception (throwIO)
>     import Control.Foldl (Fold)
>     import Data.ByteString (ByteString)
>     import Data.Csv (FromNamedRecord(..), ToNamedRecord(..), (.:), (.=))
>     import Lens.Simple (makeLensesBy)
>     import Pipes (Effect, Producer, for, lift, yield, (>->))
>
>     import qualified Control.Foldl    as Foldl
>     import qualified Data.Csv         as Csv
>     import qualified Data.Vector      as Vector
>     import qualified Pipes
>     import qualified Pipes.Prelude
>     import qualified Pipes.ByteString
>     import qualified Pipes.Csv
>     import qualified System.IO        as IO
>
>     data InputRow = InputRow
>         { input_record_id :: Int
>         , input_val_t1    :: Double
>         , input_val_t2    :: Double
>         , input_val_t3    :: Double
>         , input_val_t4    :: Double
>         }
>
>     instance FromNamedRecord InputRow where
>         parseNamedRecord m = do
>             InputRow
>                 <$> (m .: "record_id")
>                 <*> (m .: "val_t1"   )
>                 <*> (m .: "val_t2"   )
>                 <*> (m .: "val_t3"   )
>                 <*> (m .: "val_t4"   )
>
>     makeLensesBy (\n -> Just (n ++ "_lens")) ''InputRow
>
>     data OutputRow = OutputRow
>         { output_record_id :: Int
>         , output_val_t1    :: Double
>         , output_val_t2    :: Double
>         , output_val_t3    :: Double
>         , output_val_t4    :: Double
>         , output_avg_t1_t4 :: Double
>         }
>
>     instance ToNamedRecord OutputRow where
>         toNamedRecord outputRow =
>             Csv.namedRecord
>                 [ "record_id" .= output_record_id outputRow
>                 , "val_t1"    .= output_val_t1    outputRow
>                 , "val_t2"    .= output_val_t2    outputRow
>                 , "val_t3"    .= output_val_t3    outputRow
>                 , "val_t4"    .= output_val_t4    outputRow
>                 , "avg_t1_t4" .= output_avg_t1_t4 outputRow
>                 ]
>
>     fold :: Fold InputRow (Maybe OutputRow)
>     fold =
>         f   <$> Foldl.last
>             <*> Foldl.handles input_val_t1_lens Foldl.sum
>             <*> Foldl.handles input_val_t2_lens Foldl.sum
>             <*> Foldl.handles input_val_t3_lens Foldl.sum
>             <*> Foldl.handles input_val_t4_lens Foldl.sum
>             <*> Foldl.genericLength
>       where
>         f mInputRow sum_t1 sum_t2 sum_t3 sum_t4 len = do
>             inputRow <- mInputRow
>             let avg = (sum_t1 + sum_t2 + sum_t3 + sum_t4) / (4 * len)
>             let outputRow =
>                     OutputRow
>                         { output_record_id = input_record_id inputRow
>                         , output_val_t1    = input_val_t1    inputRow
>                         , output_val_t2    = input_val_t2    inputRow
>                         , output_val_t3    = input_val_t3    inputRow
>                         , output_val_t4    = input_val_t4    inputRow
>                         , output_avg_t1_t4 = avg
>                         }
>             return outputRow
>
>     main =
>         IO.withFile "input.csv"  IO.ReadMode  (\handleIn  -> do
>         IO.withFile "output.csv" IO.WriteMode (\handleOut -> do
>
>         let input :: Producer ByteString IO ()
>             input = Pipes.ByteString.fromHandle handleIn
>
>         let handleRow :: Either String InputRow -> Producer InputRow IO ()
>             handleRow (Left str)  = lift (throwIO (userError str))
>             handleRow (Right row) = yield row
>
>         let inputRows :: Producer InputRow IO ()
>             inputRows = for (Pipes.Csv.decodeByName input) handleRow
>
>         let outputRows :: Producer OutputRow IO ()
>             outputRows =
>                     inputRows
>                 >-> Foldl.purely Pipes.Prelude.scan fold
>                 >-> Pipes.Prelude.concat
>
>         let header =
>                 Vector.fromList
>                     [ "record_id"
>                     , "val_t1"
>                     , "val_t2"
>                     , "val_t3"
>                     , "val_t4"
>                     , "avg_t1_t4"
>                     ]
>
>         let output :: Producer ByteString IO ()
>             output = outputRows >-> Pipes.Csv.encodeByName header
>
>         let effect :: Effect IO ()
>             effect = output >-> Pipes.ByteString.toHandle handleOut
>
>         Pipes.runEffect effect ))
>
> Example input:
>
>     $ cat input.csv
>     record_id,val_t1,val_t2,val_t3,val_t4
>     34598,45.09,42.73,40.82,35.92
>     45523,25.56,56.23,67.32,14.83
>
> Example output:
>
>     $ cat output.csv
>     record_id,val_t1,val_t2,val_t3,val_t4,avg_t1_t4
>     34598,45.09,42.73,40.82,35.92,41.14
>     45523,25.56,56.23,67.32,14.83,41.0625
>
> On 02/03/2016 01:13 PM, Edmund Cape wrote:
>
> Yes. It's supposed to be the average. Sorry for the confusion. - E
>
> Edmund Cape PhD MBA
> 917-715-8299
>
> On Feb 2, 2016, at 10:31 PM, Gabriel Gonzalez <gabriel...@gmail.com>
> wrote:
>
> Where does the `47.20` result come from?  It doesn't seem to be the
> average of those values
>
> The only reason I'm asking is I was trying to reverse engineer if
> `avg_t1_t4` was supposed to be the average of just its row or all preceding
> rows
>
> On 02/01/2016 05:32 PM, Edmund Cape wrote:
>
> Thank you Gabriel.  A rich, helpful response.  You captured the summaries
> of the field values as they stream in.  I'm not sure I see where I consider
> folding over, for instance, a time series of data captured in each row?
>  e.g., if this is my data:
>
> record id vat_t1, val_t2, val_t3, val_t4
> 34598     45.09,  42.73,  40.82,  35.92
> 45523     25.56,  56.23,  67.32,  14.83
>
> How should I consider computing the derived field: agv_t1_t4 where the
> average is of val_1 through val_4.  What is produced will get consumed
> writing to a new csv file such as:
>
> record id vat_t1, val_t2, val_t3, val_t4, *agv_t1_t4*
> 34598     45.09,  42.73,  40.82,  35.92,  *42.90*
> 45523     25.56,  56.23,  67.32,  14.83,  *47.20*
>
> - E
>
> On Monday, February 1, 2016 at 1:38:22 PM UTC-5, Edmund Cape wrote:
>>
>> Hi -- I'm still working through the best and idiomatic way to use pipes
>> to stream through large csv datasets (1M or more records), to accomplish
>> the following:
>>
>>
>>    1. Generate summary statistics for any one or series of fields moving
>>    down the data using fold, and grouped a given field in the data; output to
>>    a file
>>
>>    2. Generate separate set of summary statistics (e.g., a regression,
>>    min, max) combining fields within a single record (row); use this
>>    information to transform/grow the record with additional (new) derived
>>    fields; output to a file (separate from the one above)
>>
>> These are separate tasks.  Because I don't need the results from (1) to
>> do any of the transformations in (2), it would be nice to accomplish both
>> simultaneously (a single continuous stream that produces two files) so I
>> can make the transformations in (2) and report on the results of what was
>> accomplished at a summary level (1).
>>
>> Here is my starting point.  Thank you in advance for any guidance.
>>
>> {-# LANGUAGE OverloadedStrings #-}
>>
>> import Pipes.Csv (decodeByName)
>> import Pipes.ByteString (ByteString, fromHandle)
>> import Data.Csv ((.:), FromNamedRecord(..))
>> import Pipes
>> import Control.Applicative
>> import System.IO hiding (stdin)
>>
>> data File = File {name::String} deriving (Show)
>> inFile :: File
>> inFile  = File "../data.csv"
>>
>> data Rec = Rec { uid  :: String
>>                , val1 :: Int
>>                , val2 :: Int } deriving (Show)
>>
>> instance FromNamedRecord Person where
>>   parseNamedRecord r =
>>     Rec <$> r .: "uid"
>>         <*> r .: "val1"
>>         <*> r .: "val2"
>>
>> records :: Monad m
>>         => Producer ByteString m ()
>>         -> Producer (Either String Person) m ()
>> records = decodeByName
>>
>> main :: IO ()
>> main =
>>   withFile (name inFile) ReadMode  $ \hIn  ->
>>   runEffect $ for (records (fromHandle hIn)) (lift . print)
>>
>>
>> - E
>>
> --
> You received this message because you are subscribed to the Google Groups
> "Haskell Pipes" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to haskell-pipes+unsubscr...@googlegroups.com.
> To post to this group, send email to haskell-pipes@googlegroups.com.
>
>
>
>


-- 

Edmund Cape, PhD MBA
(917) 715-8299
edmund.c...@gmail.com

-- 
You received this message because you are subscribed to the Google Groups 
"Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.

Reply via email to