This is a forwarded message From: Bulat Ziganshin <[EMAIL PROTECTED]> To: haskell@haskell.org Date: Thursday, December 08, 2005, 1:36:05 AM Subject: [Haskell] ANNOUNCE: Process library (for dataflow-oriented programming?)
===8<==============Original message text=============== Hello haskell, Joel's program (discussed in cafe), which now uses MVars instead of Channels to send data between threads, may be a good example of dataflow-driven program: it consists of many hundreds of threads and when one thread sends data to another through MVar, this thread in most cases goes to sleep until receiving thread will process previous value of this MVar. so, threads are waked up and asleep according to passing values between them, and the whole program executes in order defined by these data dependencies, as opposite to the order of program statements one year ago i developed small library, which can be helpful if you want to use such style of programming. its ideas are modelled after Unix pipes, which are widely used to assemble complex data processing "engines" from simple "details". really this library is very thin layer over direct using of forkOS, channels and MVars; nevertheless, is is very convenient and beatiful you can download library as http://freearc.narod.ru/Process.tar.gz this page also contains sources of my program where you can find examples of using library in real toy :) below is a guide to library usage to create pipe, which contains 3 processes - "producer", "transformer" and "consumer": runP ( producer |> transformer |> consumer ) each process in pipe runned in separate Haskell thread. process is represented by ordinary Haskell function which gets an additional parameter - handle, which can be used to receive data from previous process in pipe (using receiveP) and send data to the next process (using sendP). for example, abovementioned processes can be implemented as: producer handle = mapM_ (sendP handle) [1..10] transformer handle = replicateM_ 10 $ do x <- receiveP handle sendP handle (x*2) consumer handle = replicateM_ 10 $ do x <- receiveP handle print x if first process in pipe tries to use receiveP or last process in pipe tries to use sendP, then run-time exception is generated. number of processes in pipe can be arbitrary. because each process is just ordinary Haskell function, you can add additional parameters to processes when constructing pipes: runP ( producer |> multiple 2 |> multiple 3 |> consumer ) multiple n handle = replicateM_ 10 $ do x <- receiveP handle sendP handle (x*n) moreover, you can construct pipe or part of it as ordinary data value, which then can be runned by runP: let pipe = case multipliers of [x] -> multiple x [x,y] -> multiple x |> multiple y [x,y,z] -> multiple x |> multiple y |> multiple z _ -> \handle -> fail "Zero or too much multipliers" runP ( producer |> pipe |> consumer ) there is also "back channel", which can be used to "return" data to previous process in the pipe, its operations is send_backP and receive_backP. it can be used to return acknowledgments, synchronize processes or to return resources back. brief example of its usage: producer: sendP pipe (buf,len) consumer: ; ; (buf,len) <- receiveP pipe ; hPutBuf file buf len ; send_backP pipe () receive_backP pipe ; --now we know that buf is free ; (i organized lines to show execution order) if processes joined in pipe with "|>" then channel between them uses MVar, so at any moment it may contain no more than 1 element. if channel between two processes is created with "|>>>" then Chan is used, which can contain arbitrary number of data items. be careful with such channels, because they can grow to unlimited size. "|>" and "|>>>" can be arbitrarily combined in one pipe: runP ( producer |>>> multiple 2 |> multiple 3 |>>> consumer ) back channel (used by send_backP and receive_backP) are always multi-element (uses Chan) runP returns when all processes in pipe are finished. if any process in pipe generates uncaught exception, then all processes in pipe are killed and this exception is re-raised in thread called runP pipe or single process can also be runned in background using runAsyncP: handle <- runAsyncP (multiple 2) handle returned here can be used to interact with first and last processes in pipe, in contrast to runP: handle <- runAsyncP (multiple 2) sendP handle 1 res <- receiveP handle of course, pipe runned asynchronously is not required to perform input, output, or both: handle <- runAsyncP ( producer |> transformer ) handle <- runAsyncP ( transformer |> consumer ) handle <- runAsyncP ( producer |> transformer |> consumer ) currently channels to "open ends" of background pipe are always one-element (uses MVars). types of channels inside pipe are determined, as usual, by using the "|>" or "|>>>" operator you can wait for finishing of asynchronous pipe with "joinP handle" you can "replace" process, which is runned in pipe, with another process or pipe by using runFuncP: transformer handle = runFuncP ( multiple 2 |> multiple 3 ) (receiveP handle) (send_backP handle) (sendP handle) (receive_backP handle) actually, runFuncP just executes its pipe using 4 supported functions for interaction with first and last pipe processes. to make this obvious i will say how runP can be implemented in terms of runFuncP: runP p = runFuncP p (error "First process in runP tried to receive") (error "First process in runP tried to send_back") (error "Last process in runP tried to send") (error "Last process in runP tried to receive_back") runFuncP returns when all processes in its pipe are finished. using it, you can create unlimited number of scenarios: running several runFuncP sequentially to deal with different parts of your data, process part of data itself and part with runFuncP, "redirect" your input to process runned by runFuncP but consume output from runFuncP in other way: consumer handle = runFuncP ( multiple 4 ) (receiveP handle) undefined print -- output action undefined runFuncP can also be used in other cases when we need to run single process or whole pipe "imitating" its interaction with "external world" by some functions: runFuncP transformer readLn -- input action undefined print -- output action undefined it will also be interesting to have some functions which can just insert "filtering" process or pipe on input or output side of current process: transformer channel = do channel <- insertInputFilterP (multiple 2) channel channel <- insertOutputFilterP (multiple 4) channel x <- receiveP channel sendP channel (x+1) ... but i don't need it and therefore this currently is not implemented the library also don't contains routines to check that input is ready or output can be done (because i don't need it and it is a bad programming style), nor routines to check for EOF (i prefer to encode this explicitly in structure of data sent across channels; you also can wrap your data in Maybe type and use Nothing to encode EOF) -- Best regards, Bulat mailto:[EMAIL PROTECTED] _______________________________________________ Haskell mailing list Haskell@haskell.org http://www.haskell.org/mailman/listinfo/haskell ===8<===========End of original message text=========== -- Best regards, Bulat mailto:[EMAIL PROTECTED] _______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe