On my spare time, I've been thinking (and trying) to port Left-Fold 
Enumerators (http://okmij.org/ftp/Streams.html) to Clojure for a week or 
so — incidentally 
(http://clojure-log.n01se.net/date/2008-11-06.html#20:41) I learnt that 
Rich is also thinking about them.

Here is a report of my trials and errors on the way to implementing LFE 
in Clojure. You can go straight to step 5 for actual working code and a 
repl session.
(Abstract: I learnt clever tricks using continuations or the Y 
combinator and turned them in a lousy map.)



STEP 0:
The main idea behind LFE (compared to lazy IO or imperative IO) is to 
introduce an inversion of control: removing resources (file handles 
etc.) allocation and iteration code from user code.
eg:
 > (defn count-words [filename]
 > (with-open rdr (-> filename java.io.FileReader. java.io.BufferedReader.)
 > (reduce (fn [c line] (+ c (-> line (.split " ") count))) 0
 > (line-seq rdr))))

can be rewritten using an enumerator:
 > (defn count-words [filename]
 > ((enum-lines filename) (fn [c line] (+ c (-> line (.split " ") 
count))) 0))

or, introducing an ereduce function:
 > (defn ereduce [f seed e]
 > (e f seed))
 >
 > (defn count-words [filename]
 > (let [io-enum (enum-lines filename)]
 > (ereduce (fn [c line] (+ c (-> line (.split " ") count))) 0 io-enum)))

where enum-lines returns a function which "reduces" the lines of the 
specified file using the function and the seed passed as arguments. 
Making enumerators implement clojure.lang.IReduce would allow us to 
directly use Clojure's reduce with enumerators.

One can build an enumerator from collections:
 > (defn enum-coll [coll]
 > (fn [f seed]
 > (reduce f seed coll)))

So far so good. One can even define basic combinators and predicates on 
enumerators: econcat, etake, edrop, efilter, emap (only on one enum).
For example:
 > (defn econcat [e1 e2]
 > (fn [f seed]
 > (e2 f (e1 f seed))))
 >
 > (defn edrop [n e]
 > (fn [f seed]
 > (first (e (fn [[acc i] x] (if (pos? i) [acc (dec i)] [(f acc x) 0])) 
[seed n]))))
 >
 > (defn efilter [pred e]
 > (fn [f seed]
 > (e #(if (pred %2) (f %1 %2) %1) seed)))
 >
 > (defn etake [n e]
 > (fn [f seed]
 > ((e
 > (fn [[acc n :as a] x] (if (pos? n) [(f acc x) (dec n)] a)
 > [seed n]) 0)))
 >
 > (def enil ;the empty enumerator
 > (fn [f seed]
 > seed))
 >
 > (def enil? [e]
 > (e (constantly false) true))

One can also build enumerators based on java.nio which would be reduced 
on an IO thread. (Since (reduce f seed coll) is roughly equivalent to 
(let [a (agent seed)] (doseq [x coll] (send a f x)) (await a) @a) 
integration between multiplexed enumerators and agent is relatively 
straightforward.)

There are two drawbacks to this simple design:
* the enumerated collection must be entirely processed (no early 
termination, see enil? or etake),
* no "parallel loops" (btw, there's not that much facility for "parallel 
traversing" in Clojure: there's map... and map; it could be a job for 
the vector-binded doseq (and then we would need a dofor...)).

Adding early exit means designing a protocol for the reducing function 
to signal to the enumerator to stop there. It's not a tough point, I 
won't bother about it until the end of this post.



STEP 1:
Concerning "parallel loops" (that is an emap that works on several 
enumerators at once, like map does with seqs) I tried to come up with a 
solution using one of Oleg Kiselyov's tricks to turn an enumerator 
inside out.
The trick is to use a non recursive version of the enumerator — this non 
recursive enumerator takes the (recursive) enumerator as its first 
argument. (Btw, thanks to the recur special form, some clever macrology 
can help building a non recursive enumerator from the source of a 
recursive one.)

With a non-recursive enumerator, ereduce becomes:
 > (defn ereduce [f seed non-rec-enum]
 > (let [good-old-enum (fn self [f seed] (non-rec-enum self f seed))]
 > (good-old-enum f seed)))

But it smells like blown up stack. Let's trampoline!

 > (defn ereduce [f seed non-rec-enum] ; with trampolining
 > (let [ccall (fn self [& args] (cons self args)) ; ccall = capture call
 > ccall? #(and (seq? %) (= (first %) ccall))]
 > (loop [r (non-rec-enum ccall f seed)]
 > (if (ccall? r)
 > (recur (apply non-rec-enum r))
 > r))))

Back to emap:
 > (defn emap [f & nres] ;nres = non recursive enumerators
 > (fn [g seed]
 > (let [ccall (fn self [& args] (cons self args))
 > ccall? #(and (seq? %) (= (first %) ccall))]
 > (loop [rs (map #(%1 ccall (fn [_ x] x) nil) nres) acc seed]
 > (if (every ccall? rs)
 > (recur
 > (map #(%1 ccall (fn [_ x] x) nil) nres) ; déjà vu! Does not sound 
good, see below
 > (g acc (apply f (map last rs))))
 > acc))))) ; here I should consume all the enums til the end to ensure 
correct resource freeing... or trigger early exit or something



STEP 2:
Well that's not that easy, there's an elephant in the room since I 
introduced the non recursive enumerator: implicit state! (and yes, I 
know, emap does not return a non recursive enumerator but an enumerator)
So the non recursive enumerator must take and pass another arg (its 
state) and I need a way to initialize the state. I chose to use 
different arities to tell the first call from the others.
 > (defn a-non-rec-enum
 > ([self f seed] ;first call
 > ......)
 > ([self state f acc] ;subsequent calls
 > ......))

The trampolining ereduce does not need to be modified to pass the 
enumerator's state around.
Here is the new emap:
 > (defn emap [f & nres] ; now with explicit state handling -- and 
returns a non rec enum
 > (let [ccall (fn self [& args] (cons self args))
 > ccall? #(and (seq? %) (= (first %) ccall))
 > common (fn [rs self g acc] ;
 > (if (every ccall? rs)
 > (self
 > (map second rs) ; compound state
 > g
 > (g acc (apply f (map last rs))))
 > acc))] ; still TODO: add proper termination
 > (fn
 > ([self g seed]
 > (let [rs (map #(%1 ccall (fn [_ x] x) nil) nres)]
 > (common rs self g seed)))
 > ([self state g acc]
 > (let [rs (map #(%1 ccall %2 (fn [_ x] x) nil) nres state)] ; no more 
elephant
 > (common rs self g seed))))))




STEP 3: where I'm doing it wrong (please skip to STEP 5)

I'm not happy with ignoring an arg (in (fn [_ x] x)) and passing this g 
function around. I think it's time to redefine what constitutes the job 
of a non recursive enumerator: enumerating, not reducing over enumerated 
values.
 > (defn a-non-rec-enum-that-does-no-reducing ; returns [state x] where 
x is the last enumerated item or returns eoe when at end
 > ([eoe] ;first call
 > ......)
 > ([state eoe] ;subsequent calls
 > ......))

Reducing is ereduce's job:
 > (defn ereduce [f seed non-rec-enum]
 > (let [eoe (Object.)]
 > (loop [r (non-rec-enum eoe)
 > acc seed]
 > (if (identical? eoe r)
 > acc
 > (recur
 > (non-rec-enum (first r) eoe)
 > (f acc (second r)))))))

and then emap is simplified to:
 > (defn emap [f & nres]
 > (let [common (fn [eoe rs]
 > (if (some #(identical? eoe %) rs)
 > eoe ; still TODO: add proper termination
 > [(map first rs) ; compound state
 > (apply f (map second rs))]))]
 > (fn
 > ([eoe]
 > (let [rs (map #(%1 eoe) nres)]
 > (common eoe rs)))
 > ([state eoe]
 > (let [rs (map #(%1 %2 eoe) nres state)]
 > (common eoe rs))))))



STEP 4: Still in error

I think I'm drifting too far away from original LFEs and still there are 
two unattended problems:
* early termination,
* proper termination of enumerators in emap.

Hopefully, they are related: they both call for a way to clean the state 
and resources of an enumerator. This time it won't fit using an arity 
overloading. Let's turn the NRE into a map:
 > {:enumerators/init (fn [eoe] ; returns [state x] or [state eoe]
 > .......)
 > :enumerators/step (fn [state eoe] ; returns [state x] or [state eoe]
 > .......)
 > :enumerators/clean (fn [state] ; the new one
 > .......)}

(As I'm writing, I realize that I could get rid of all these eoes by 
returning only [state].
Right now I'll continue to return [state eoe] since it feels more regular.)

ereduce and emap, one again:
 > (defn ereduce [f seed non-rec-enum]
 > (let [eoe (Object.)]
 > (loop [[state x] ((non-rec-enum :enumerators/init) eoe)
 > acc seed]
 > (if (identical? eoe x)
 > (do ; argh :-)
 > ((non-rec-enum :enumerators/clean) state)
 > acc)
 > (recur
 > ((non-rec-enum :enumerators/step) state eoe)
 > (f acc x))))))
 >
 > (defn emap [f & nres]
 > (let [inits (map :enumerators/init nres)
 > steps (map :enumerators/step nres)
 > cleans (map :enumerators/clean nres)
 > common (fn [eoe rs]
 > (let [state (map first rs)
 > vals (map second rs)]
 > [state
 > (if (some #(identical? eoe %) vals)
 > eoe
 > (apply f vals))]))
 > {:enumerators/init (fn [eoe]
 > (let [rs (map #(%1 eoe) inits)]
 > (common eoe rs)))
 > :enumerators/step (fn [state eoe]
 > (let [rs (map #((%1 : %2 eoe) steps state)]
 > (common eoe rs)))
 > :enumerators/clean (fn [state]
 > (dorun (map #(%1 %2) cleans state))}))


It's interesting to notice that, at this point, etake does not need 
early termination
 > (defn etake [n e]
 > {:enumerators/init (fn [eoe]
 > (if (pos? n)
 > (let [[state x] ((e :enumerators/init) eoe)]
 > [[state (dec n)] x])
 > [[state 0] eoe]))
 > :enumerators/step (fn [[state n] eoe]
 > (if (pos? n)
 > (let [[state x] ((e :enumerators/step) state eoe)]
 > [[state (dec n)] x])
 > [[state 0] eoe]))
 > :enumerators/clean (fn [[state _]]
 > ((e :enumerators/clean) state))

and that multiplexing has been lost while trying to bring a "parallel 
map". In fact, I lost good multiplexing potential at STEP 3: since then 
ereduce apply the reducing function at each step, which implies that 
each step must now return a new value while, before STEP 3, the only 
purpose of a "step" was to advance the computation (think about filtered 
enumerators: the computation can advance without calling the reducing 
function).
I'm afraid that I reversed the inversion of control :-( and this is why 
the last etake function was interesting: because it does not involve the 
reducing function anymore (see STEP 0).



STEP 5: back on track

Now I define an enumerator as three functions:
 > (def a-non-rec-enum
 > {:create-state (fn [] ; returns a state value
 > ......)
 > :step (fn [self state f acc] ; returns a ccall or [result 
final-state], it's the regular part of an enumerator as of STEP 2
 > ......)
 > :destroy-state (fn [state] ; destroy the state (release resources)
 > ......)})

The updated ereduce:
 > (defn ereduce [f seed non-rec-enum]
 > (let [ccall (fn self [& args] (cons self args))
 > ccall? #(and (seq? %) (= (first %) ccall))
 > state ((non-rec-enum :create-state))
 > nre-step (non-rec-enum :step)]
 > (loop [args [ccall state f seed]]
 > (let [r (apply nre-step args)]
 > (if (ccall? r)
 > (recur r)
 > (let [[result state] r]
 > ((non-rec-enum :destroy-state) state)
 > result))))))

emap: (I'm pretty sure there's a clever/more functional way to write emap)
 > (defn emap [f & nres]
 > (let [ccall (fn self [& args] (cons self args))
 > ccall? #(and (seq? %) (= (first %) ccall))
 > nre-steps (map :step nres)
 > marker (Object.)]
 > {:create-state (fn [] (map
 > (fn [nre] (ccall
 > ((nre :create-state))
 > (fn [_ x] x)
 > marker))
 > nres))
 > :step (fn [self state g acc]
 > (if (every? ccall? state)
 > (if (some #(identical? marker (last %)) state)
 > (self (map #(if (identical? marker (last %2)) (apply %1 %2) %2) 
nre-steps state) g acc) ; advance enumerators that haven't "produced" a 
value yet
 > (self (map (fn [step [self state f _]] (step self state f marker)) 
nre-steps state) g (g acc (apply f (map last state))))) ; all 
enumerators are ready: reduce and advance them all!
 > [acc state]))
 > :destroy-state (fn [state]
 > (dorun (map #((%1 :destroy-state) (second %2)) nres state)))})); a 
"parallel" doseq would be handy :-)

helper function to turn a seqable into an enumerator:
 > (defn enum-seq [c]
 > {:create-state (fn [] (seq c))
 > :step (fn [self state f acc]
 > (if-let [fst & rst] state
 > (self rst f (f acc fst))
 > [acc nil]))
 > :destroy-state (fn [state] )})

and another to turn an enumerator into a vector:
 > (defn v [e] (ereduce conj [] e))

efilter:
 > (defn efilter [pred {:keys [create-state step destroy-state]}]
 > {:create-state create-state
 > :step (fn [self state f acc]
 > (step self state #(if (pred %2) (f %1 %2) %1) acc))
 > :destroy-state destroy-state})

a simple io-based enumerator constructor:
 > (defn enum-lines [filename]
 > {:create-state (fn [] (-> filename java.io.FileReader. 
java.io.BufferedReader.))
 > :step (fn [self #^java.io.BufferedReader rdr f acc]
 > (if-let line (.readLine rdr)
 > (self rdr f (f acc line))
 > [acc rdr]))
 > :destroy-state (fn [#^java.io.BufferedReader rdr]
 > (.close rdr))})

Ok, let's test them:
 > user=> (def e1 (enum-seq (range 30)))
 > #'user/e1
 > user=> (def e2 (enum-seq "ABCDEFGHIJ"))
 > #'user/e2
 > user=> (v (emap vector e1 e2))
 > [[0 \A] [1 \B] [2 \C] [3 \D] [4 \E] [5 \F] [6 \G] [7 \H] [8 \I] [9 \J]]
 > user=> (v (efilter #(zero? (rem % 2)) e1))
 > [0 2 4 6 8 10 12 14 16 18 20 22 24 26 28]
 > user=> (v (emap vector (efilter #(zero? (rem % 2)) e1) e2))
 > [[0 \A] [2 \B] [4 \C] [6 \D] [8 \E] [10 \F] [12 \G] [14 \H] [16 \I] 
[18 \J]]
 > user=> (v (enum-lines "/tmp/test-enums"))
 > ["This is line one," "this is line two," "this is line three," "this 
is line four," "this is line five."]
 > user=> (v (efilter #(= 18 (count %)) (enum-lines "/tmp/test-enums")))
 > ["this is line four," "this is line five."]

That's all for this post. I haven't adressed early termination yet but I 
haven't even made my mind between the reducing function returning an 
out-of-band value or throwing a canned Throwable (I don't consider the 
option of having the reducing function to return [mustStop value] 
because I'd like to be able to pass "normal" fuctions.)

The idea would be something along these lines:
 > (def enil? [e]
 > (ereduce #(fn [x _] (if x false (stop-enumeration))) true))
and not:
 > (def enil? [e]
 > (ereduce #(fn [_ _] (stop-and-return x)) true))

I think that one can write a function to transform a basic enumerator 
(such as those returned by enum-seq or enum-lines) into an 
early-termination-aware one.
About multiplexing, there are some details to sort out but I think that 
enumerators returned by emap can be made multiplexable if the 
mapped-over enumerators also are multiplexable.

Christophe
-- 
http://cgrand.net/
Clojure and me http://clj-me.blogspot.com/


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/clojure?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to