Figured out how to do this. Adding the code here for others who might try to solve this problem in future. Improvement suggestions are welcome.
import Streaming.Prelude as S import Data.IORef import Streaming as S import qualified Codec.Compression.Zstd.Streaming as Z import qualified Data.ByteString as BS (ByteString,empty) -- Compression streamer - uses Zstd compression streamZstd :: (MonadIO m,Monad m) => IO Z.Result -> Stream (Of BS.ByteString ) m () -> Stream (Of BS.ByteString) m () streamZstd pop inp = loop inp pop where loop bytes res = do bs <- liftIO res case bs of Z.Error who what -> error (who ++ ": " ++ what) Z.Done bs -> (lift . S.uncons $ bytes) >>= (maybe (S.yield bs) (\_ -> error "Compress/Decompress ended while input stream still had bytes")) Z.Produce bs npop -> S.yield bs >> loop bytes npop -- if we run out of input stream, call loop with empty stream, and compress function with empty ByteString -- to signal end - we should then be in Done state in next call to loop Z.Consume f -> (lift . S.uncons $ bytes) >>= (maybe (loop (return ()) (f BS.empty)) (\(bs,nbs) -> loop nbs (f bs))) decompress :: (MonadIO m,Monad m) => Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m () decompress = streamZstd Z.decompress compress :: (MonadIO m,Monad m) => Int -> Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m () compress level = streamZstd (Z.compress level) On Saturday, March 10, 2018 at 5:56:10 PM UTC-5, Sal wrote: > > Hello, > > I am trying to adapt streaming version of `zstandard > <https://hackage.haskell.org/package/zstd-0.1.0.0/docs/Codec-Compression-Zstd-Streaming.html>` > > using `Streaming` library. There is already a version > <https://github.com/michaelt/streaming-zstd/blob/master/Streaming/Zstd.hs> > that exists using `ByteString m r`. I can't figure out how to implement a > function like below, and will appreciate help: > > stream :: MonadIO m => S.Stream (S.Of B.ByteString) m r -> Result -> S. > Stream (S.Of B.ByteString) m r > > Here is the original code from `streaming-zstd`: > > stream :: MonadIO m => ByteString m r -> Result -> ByteString m r > stream (Go m) res = lift m >>= flip stream res > stream bs (Error who what) = error (who ++ ": " ++ what) > stream bs (Produce bytes res') = Chunk bytes (liftIO res' >>= stream bs) > stream (Chunk c cs) (Consume f) = liftIO (f c) >>= stream cs > stream (Empty r) (Consume f) = liftIO (f mempty) >>= stream (Empty r) > stream (Empty r) (Done o) = Chunk o (Empty r) > stream input state = error $ "unpossible! bytes of input left in stream > state " > ++ show state > > > > > > -- You received this message because you are subscribed to the Google Groups "Haskell Pipes" group. To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipes+unsubscr...@googlegroups.com. To post to this group, send email to haskell-pipes@googlegroups.com.