Hello, everyone! I'm Haskell beginner and newbie in Streaming library. As I 
understood, this group is dedicated to Pipes, but Streaming library too. My 
simple question is related to Streaming library usage. I'm trying to 
realize next common and popular "pattern": processing of streaming items 
with common state, see Fig, please:

     .--state-------+--state'--------+--state''-->
     |              |                |
  [e0..eN] ==> [e0'...eN'] ==> [e0''..eN''] =====>


This "state" will be used for statistics, errors, whatever - through the 
whole workflow. Some of "pipe" nodes will iterate over `eN` items (which 
will be lists/streams too), concatenates results... How this can be 
achieved with Streaming library? I mean each "node" should have access to 
stream items but to "global" state (result of prev. node return?) too. Also 
nodes will do IO actions!

My first attempt was:

...
import Streaming
import qualified Streaming.Prelude as S
...


-- simulate source of items in the stream
gen :: S.Stream (S.Of Int) IO [String]
gen = do
  S.yield 1000
  S.yield 2000
  x <- lift getLine -- simulate item got through IO
  return ["a", "b", "c", x]


-- simulate some node - processor on the pipe
proc :: S.Stream (S.Of Int) IO [String] -> S.Stream (S.Of Int) IO [String]
proc str = do
  e <- str -- WRONG!
  -- S.yield $ e + 8
  lift $ print "Enter x:"
  x <- lift getLine -- simulate IO communication
  return $ e ++ [" -- " ++ x] -- simulate change of common state via result 
component of ":>" pair

main :: IO ()
main = do
  let
  s <- S.mapM_ print $ S.map show gen
  p <- S.mapM_ print $ proc gen
  putStr "s: " >> print s
  putStr "p: " >> print p
  print "end."


But this is wrong sure, because "do e <- str" binds "e" to results of 
previous result, but not to stream's items. And I need to have access to 
both: stream's items and state (which I suppose) can be second component of 
Stream ":>" pair, i.e. result, returning with "return" function.

I tried also to implement "proc" as "a -> m b" function and to apply it 
with "S.mapM", but in this case I'm processing stream's items but without 
previous "return" - state.

Intuitively I'm feeling that solution may be in transformation of result 
monad "m", to be not only IO, but with "StateT", but I don't know how to:

   1. do it
   2. call it  - in drawn workflow
   3. if I'll yield items wrapped in State monad, what a devil will be it, 
   a specially related to performance.
   
I'll need to iterate over some stream's items too (they are lists), to join 
results and forever to have common state.

Can somebody help with it, please?

===
Best regards, Paul

-- 
You received this message because you are subscribed to the Google Groups 
"Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to haskell-pipes+unsubscr...@googlegroups.com.
To post to this group, send email to haskell-pipes@googlegroups.com.

Reply via email to