I have following problem: I'd like to operate on big files so I'd prefere to operate on 'stream' instead of whole file at a time to avoid keeping too much in memory. I need to calculate MD5 and compress file.
I tried to use something like that but I'm afraid that I'd need to patch zlib package as it results in deadlock: > {-# LANGUAGE GADTs #-} > import Codec.Compression.GZip > import Control.Applicative > import Control.Concurrent.CHP > import qualified Control.Concurrent.CHP.Common as CHP > import Control.Concurrent.CHP.Enroll > import Control.Concurrent.CHP.Utils > import Control.Monad.State.Strict > import Data.Digest.Pure.MD5 > import Data.Maybe > import qualified Data.ByteString.Char8 as BS > import qualified Data.ByteString.Lazy.Char8 as LBS > import qualified Data.ByteString.Lazy.Internal as LBS > import System.Environment > import System.IO > import System.IO.Unsafe > > > calculateMD5 :: (ReadableChannel r, > Poisonable (r (Maybe BS.ByteString)), > WriteableChannel w, > Poisonable (w MD5Digest)) > => r (Maybe BS.ByteString) > -> w MD5Digest > -> CHP () > calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext > `onPoisonRethrow` (poison in_ >> poison out) > where loop = liftCHP (readChannel in_) >>= > calc' > calc' Nothing = gets md5Finalize >>= > liftCHP . > writeChannel out >> > put md5InitialContext > calc' (Just b) = modify (flip md5Update > $ LBS.fromChunks [b]) Calculate MD5 hash of input stream. Nothing indicates EOF. > unsafeInterleaveCHP :: CHP a -> CHP a > unsafeInterleaveCHP = fromJust <.> liftIO <=< > unsafeInterleaveIO <.> embedCHP Helper function. It is suppose to move the execution in time - just as unsafeInterleaveIO. I belive that the main problem lives here. Especially that Maybe.fromJust: Nothing is the error. > chan2List :: (ReadableChannel r, Poisonable (r a)) > => r a -> CHP [a] > chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_) > (chan2List in_)) > `onPoisonTrap` return []) Changes channel to lazy read list. > chanMaybe2List :: (ReadableChannel r, > Poisonable (r (Maybe a))) > => r (Maybe a) > -> CHP [[a]] > chanMaybe2List in_ = splitByMaybe <$> chan2List > where splitByMaybe [] = [] > splitByMaybe (Nothing:xs) = > []:splitByMaybe xs > splitByMaybe (Just v :[]) = [[v]] > splitByMaybe (Just v :xs) = > let (y:ys) = splitByMaybe xs > in (v:y):ys Reads lazyly from channel o list of list > compressCHP :: (ReadableChannel r, > Poisonable (r (Maybe BS.ByteString)), > WriteableChannel w, > Poisonable (w (Maybe BS.ByteString))) > => r (Maybe BS.ByteString) > -> w (Maybe BS.ByteString) > -> CHP () > compressCHP in_ out = toOut >>= mapM_ sendBS > where in_' :: CHP [LBS.ByteString] > in_' = fmap LBS.fromChunks <$> > chanMaybe2List in_ > toOut :: CHP [LBS.ByteString] > toOut = fmap compress <$> in_' > sendBS :: LBS.ByteString -> CHP () > sendBS LBS.Empty = writeChannel out > Nothing > sendBS (LBS.Chunk c r) = writeChannel out > (Just c) > >> sendBS r Compress process > readFromFile :: (ReadableChannel r, > Poisonable (r String), > WriteableChannel w, > Poisonable (w (Maybe BS.ByteString))) > => r String > -> w (Maybe BS.ByteString) > -> CHP () > readFromFile file data_ = > forever (do path <- readChannel file > hnd <- liftIO $ openFile path ReadMode > let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) >>= > writeChannel data_ . Just > copy `onPoisonRethrow` liftIO (hClose hnd) > writeChannel data_ Nothing > liftIO $ hClose hnd) > `onPoisonRethrow` (poison file >> poison data_) Process reading from file > writeToFile :: (ReadableChannel r, > Poisonable (r String), > ReadableChannel r', > Poisonable (r' (Maybe BS.ByteString))) > => r String > -> r' (Maybe BS.ByteString) > -> CHP () > writeToFile file data_ = > forever (do path <- readChannel file > hnd <- liftIO $ openFile path WriteMode > let writeUntilNothing = readChannel data_ >>= > writeUntilNothing' > writeUntilNothing' Nothing = return () > writeUntilNothing' (Just v) = liftIO (BS.hPutStr > hnd v) >> > writeUntilNothing > writeUntilNothing `onPoisonFinally` liftIO (hClose hnd)) > `onPoisonRethrow` (poison file >> poison data_) Process writing to file > getFiles :: (WriteableChannel w, Poisonable (w String)) > => w String -> CHP () > getFiles out = mapM_ (writeChannel out) ["test1", "test2"] >> > poison (out) Sample files. Each contains "Test1\n" > pipeline1 :: CHP () > pipeline1 = do md5sum <- oneToOneChannel' $ chanLabel "MD5" > runParallel_ [(getFiles ->|^ > ("File", readFromFile) ->|^ > ("Data", calculateMD5)) > (writer md5sum), > forever $ readChannel (reader md5sum) >>= > liftIO . print] First pipeline. Output: fa029a7f2a3ca5a03fe682d3b77c7f0d fa029a7f2a3ca5a03fe682d3b77c7f0d < File."test1", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File."test2", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d > > pipeline2 :: CHP () > pipeline2 = enrolling $ do > file <- oneToManyChannel' $ chanLabel "File" > fileMD5 <- oneToOneChannel' $ chanLabel "File MD5" > data_ <- oneToOneChannel' $ chanLabel "Data" > md5 <- oneToOneChannel' $ chanLabel "MD5" > md5BS <- oneToOneChannel' $ chanLabel "MD5 ByteString" > fileMD5' <- Enroll (reader file) > fileData <- Enroll (reader file) > liftCHP $ runParallel_ [getFiles (writer file), > (forever $ readChannel fileMD5' >>= > writeChannel (writer fileMD5) . > (++".md5")) > `onPoisonRethrow` > (poison fileMD5' >> > poison (writer fileMD5)), > readFromFile fileData (writer data_), > calculateMD5 (reader data_) (writer md5), > (forever $ do v <- readChannel (reader md5) > let v' = Just $ BS.pack $ show v > writeChannel (writer md5BS) v' > writeChannel (writer md5BS) > Nothing) > `onPoisonRethrow` > (poison (writer md5BS) >> > poison (reader md5)), > writeToFile (reader fileMD5) (reader md5BS)] Correct pipeline (testing EnrollingT): < _b4, File MD5."test1.md5", Data.Just "Test1\n", Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, _b4, MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d", Data.Just "Test1\n", Data.Nothing, MD5 ByteString.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File MD5."test2.md5", MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d", MD5 ByteString.Nothing > % cat test1.md5 fa029a7f2a3ca5a03fe682d3b77c7f0d% > pipeline3 :: CHP () > pipeline3 = enrolling $ do > file <- oneToManyChannel' $ chanLabel "File" > fileGZ <- oneToOneChannel' $ chanLabel "File GZ" > data_ <- oneToManyChannel' $ chanLabel "Data" > compressed <- oneToManyChannel' $ chanLabel "Data Compressed" > md5 <- oneToOneChannel' $ chanLabel "MD5" > md5Compressed <- oneToOneChannel' $ chanLabel "MD5 Compressed" > fileGZ' <- Enroll (reader file) > fileData <- Enroll (reader file) > dataMD5 <- Enroll (reader data_) > dataCompress <- Enroll (reader data_) > compressedFile <- Enroll (reader compressed) > compressedMD5 <- Enroll (reader compressed) > liftCHP $ runParallel_ [getFiles (writer file), > (forever $ readChannel fileGZ' >>= > writeChannel (writer fileGZ) . > (++".gz")) > `onPoisonRethrow` > (poison fileGZ' >> poison (writer fileGZ)), > readFromFile fileData (writer data_), > calculateMD5 dataMD5 (writer md5), > compressCHP dataCompress > (writer compressed), > writeToFile (reader fileGZ) compressedFile, > calculateMD5 compressedMD5 > (writer md5Compressed), > forever $ readChannel dataMD5 >>= > liftIO . print >> > readChannel compressedMD5 >>= > liftIO . print] Problems: (CHP) Thread terminated with: thread blocked indefinitely in an STM transaction < _b3, _b4, File GZ."test1.gz" > > onPoisonFinally :: CHP a -> CHP () -> CHP a > onPoisonFinally m b = (m `onPoisonRethrow` b) <* b > Utility function (used for closing handles) > (<.>) :: Functor f => (b -> c) -> (a -> f b) -> a -> f c > f <.> g = fmap f . g <.> is for <$> as . to $. > instance MonadCHP m => MonadCHP (StateT s m) where > liftCHP = lift . liftCHP Missing instance for strict monad > (->|^) :: Show b > => (Chanout b -> CHP ()) -> (String, Chanin b -> c -> CHP ()) > -> (c -> CHP ()) > (->|^) p (l, q) x = do c <- oneToOneChannel' $ chanLabel l > runParallel_ [p (writer c), q (reader c) x] 'Missing' helper function > data EnrollingT a where > Lift :: CHP a -> EnrollingT a > Enroll :: (Enrollable b z) => b z -> EnrollingT (Enrolled b z) > > enrolling :: EnrollingT a -> CHP a > enrolling (Lift v) = v > enrolling (Enroll b) = enroll b return > > instance Monad EnrollingT where > (Lift m) >>= f = Lift $ m >>= enrolling . f > (Enroll b) >>= f = Lift $ enroll b (enrolling . f) > return = Lift . return > instance MonadIO EnrollingT where > liftIO = Lift . liftIO > instance MonadCHP EnrollingT where > liftCHP = Lift Helper monad for enrolling (I know T should stand for transforming but then I realize problems). Thanks in advance _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe