I do have one last question for you on the topic. I'm thinking of using the below function within a core.async context — specifically within a `go` block:
(defn reduce-indexed [f init xs] (reduce (let [i (mutable-long -1)] (fn [ret x] (f ret (mutable-swap! i inc) x))) init xs)) Are you saying that *any* unsynchronized, non-volatile mutable operation is unsafe within a core.async context, or specifically within transducers when applied to e.g. `chan`? That is, when are thread context switches possible — specifically within `go` blocks where the blocking takes and puts are transformed into asynchronous takes and puts participating in a state machine, like so?: (go (let [x (unsynchronized-mutable 0)] (println "thread 1" (Thread/currentThread)) (reset! x 1) (<! c) ; defined elsewhere (println "possibly not thread 1" (Thread/currentThread)) (println "x =" @x))) ; possibly cached in memory and may thus still read as x=0, so maybe x should be `volatile!` Thanks! On Sunday, April 9, 2017 at 9:39:35 PM UTC-4, Alexander Gunnarson wrote: > > You make a very good point. I had been under the misimpression that you > could make an `r/folder` out of any thread-safe transducer like so, and it > would work out of the box: > > (defn broken-reducers-map-indexed [f coll] > (r/folder coll (map-indexed-transducer-concurrently-multi-threaded f))) > > and then you could use it like so: > > (->> (range 10 20) vec (broken-reducers-map-indexed vector) (r/fold ...)) > > However, while the indices *do* all appear (unlike in the case of the > `volatile`-using transducer), they are out of order, unlike the (indexed) > elements of the original range which do not rely on a stateful transducer > to keep track of the current index. So you're right — a subtler > implementation is required here that sadly isn't as simple as I had thought > (just reusing transducers). > > You could wrap stateful transducers in an `r/reducer` for use with the > threading macro, but they wouldn't be foldable: > > (defn non-foldable-reducers-map-indexed [f coll] > (r/reducer coll (core/map-indexed f))) > > (->> (range 10 20) vec (non-foldable-reducers-map-indexed vector) (fold > ...)) ; won't employ parallelism > > That said, it seems to me that you *can* use stateless transducers like > `map` in any context (single-threaded, sequentially multi-threaded, or > concurrent) and get consistent results: > > (defn reducers-map-implemented-with-transducer [f coll] > (r/folder coll (core/map f))) > > I guess the pipe dream of writing any transducer, stateful or not, and > getting a parallel-ready transformation out of it by wrapping it in an > `r/folder` is gone. If I remember correctly, the `tesser` library won't > help here. I might end up coding up something to ameliorate the situation > because I was planning on being able to just do e.g. > > (->> (range 10 20) > (r/map ...) > (reducers-map-indexed vector) > ... > (fold ...)) > > Anyway, thanks so much for your insights! I appreciate you taking the time > to share them! > > On Sunday, April 9, 2017 at 7:16:18 PM UTC-4, tbc++ wrote: >> >> In your example transducer, the problem is with the `result` parameter. >> The specification of transducers is that the result of `(rf result x)` >> should be fed into the next call to `rf`. In other words: (-> result (rf >> x1) (rf x2) (rf x3))` trying to do that in a parallel context is next to >> impossible. Not saying there isn't a way to code a transducer-like thing to >> work with multiple threads, but the result of that would look a lot more >> like core.async or Reactive Extensions, than the transducers we have today. >> >> On Sun, Apr 9, 2017 at 4:57 PM, Alexander Gunnarson < >> alexander...@gmail.com> wrote: >> >>> That makes sense about them not being designed for that use case. I >>> would add, though, that transducers could certainly be used in a parallel >>> context *if* the current transducer implementations were abstracted such >>> that you could pass internal state generator and modifier functions and use >>> the correct ones in whichever context is appropriate (single-threaded >>> read/write, sequentially multi-threaded read/write à la core.async, >>> concurrently multi-threaded read/write à la core.reducers). In the case of >>> `map-indexed`, the fact that its transducer uses a volatile as currently >>> implemented is not part of the `map-indexed` "contract", if you will, and >>> seems to me to be an implementation detail. One could just as easily write >>> the transducer for `map-indexed` as below: >>> >>> (defn map-indexed-transducer-base [f box-mutable inc-mutable] >>> (fn [rf] >>> (let [i (box-mutable -1)] >>> (fn >>> ([] (rf)) >>> ([result] (rf result)) >>> ([result input] >>> (rf result (f (inc-mutable i) input))))))) >>> >>> (defn map-indexed-transducer-single-threaded [f] >>> (map-indexed-transducer-base f unsynchronized-mutable-long! >>> #(unsynchronized-mutable-swap! >>> % inc)) >>> >>> (defn map-indexed-transducer-sequentially-multi-threaded [f] >>> (map-indexed-transducer-base f volatile! #(vswap! % inc)) >>> >>> (defn map-indexed-transducer-concurrently-multi-threaded [f] >>> (map-indexed-transducer-base f atom #(swap! % inc)) ; or an >>> AtomicLong variant >>> >>> >>> On Sunday, April 9, 2017 at 6:47:46 PM UTC-4, tbc++ wrote: >>>> >>>> Transducers were never designed to work in parallel context. So I'd >>>> define any behavior that arises from using the same transducers in >>>> multiple >>>> threads *at the same time*, as undefined behavior. >>>> >>>> On Sun, Apr 9, 2017 at 4:39 PM, Alexander Gunnarson < >>>> alexander...@gmail.com> wrote: >>>> >>>>> I should add that, as Timothy pointed out, if multiple threads mutate >>>>> and read the value but only one ever does so at a time, as is the case in >>>>> `core.async`, then a volatile is sufficient. My preliminary conclusions >>>>> above about volatiles apply only to concurrent mutation via e.g. `fold` >>>>> or >>>>> the like. >>>>> >>>>> Also, regarding the locks you mentioned, Seth, I read up a little on >>>>> the Java memory model here >>>>> <http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#synchronization> >>>>> >>>>> and I can confirm that a lock is sufficient to provide *both* write *and* >>>>> read thread-safety guarantees: >>>>> >>>>> ... acquir[ing a] monitor ... has the effect of invalidating the local >>>>>> processor cache so that variables will be reloaded from main memory. We >>>>>> will then be able to see all of the writes made visible by the previous >>>>>> release. >>>>>> >>>>> >>>>> `Volatile` only provides a subset of these read-safety guarantees, so >>>>> a `volatile` in addition to a lock is indeed overkill, if that's what is >>>>> happening. >>>>> >>>>> On Sunday, April 9, 2017 at 6:19:51 PM UTC-4, Alexander Gunnarson >>>>> wrote: >>>>>> >>>>>> It looks that way to me too, Seth, though I'd have to comb over the >>>>>> details of the locks implemented there to give a reasoned opinion of my >>>>>> own. But yes, if that's the case, the volatile isn't adding anything. >>>>>> >>>>>> Anyway, I'm not trying to poke holes in the current implementation of >>>>>> transducers — on the contrary, I'm very appreciative of and impressed by >>>>>> the efforts the clojure.core (and core.async) contributors have made on >>>>>> that and other fronts. Transducers are an extremely powerful and elegant >>>>>> way to express code that would otherwise be a lot more complex and >>>>>> difficult to reason about. I'm just trying to figure out where I can get >>>>>> away with having unsynchronized mutable versions of stateful transducers >>>>>> that currently use volatiles, and where I need even stronger measures of >>>>>> thread safety than volatiles. >>>>>> >>>>>> To take these thoughts further, I did a simple test to compare the >>>>>> three types of mutability we've been talking about (unsynchronized, >>>>>> volatile, and atomic — I can reproduce the code here if you'd like) and >>>>>> the >>>>>> takeaway is that `map-indexed` really does rely on atomic operations in >>>>>> a >>>>>> multithreaded context, as each index depends on the previous index >>>>>> value. >>>>>> When doing a `volatile`-based `map-indexed` in parallel on a small >>>>>> collection (8 elements), the `volatile` value stays consistent — that >>>>>> is, >>>>>> all the correct indices are passed to the mapping function. However, >>>>>> over a >>>>>> sufficiently large collection (100 elements, though it could happen on >>>>>> smaller scales too), the `volatile` value starts to break down: >>>>>> duplicate >>>>>> index values are passed to the mapping function and the highest index >>>>>> value >>>>>> only ever reaches 97 at the maximum. The same phenomenon happens, of >>>>>> course, with the unsynchronized-mutable-box-based `map-indexed`, though >>>>>> it >>>>>> happens at a small scale too (calling the unsynchronized `map-indexed` >>>>>> on 8 >>>>>> elements operated on by 2 threads produces only 7 unique indices). >>>>>> >>>>>> My preliminary conclusions are: >>>>>> - Unsynchronized mutability is fine in contexts known to be only >>>>>> single-threaded, in which I could replace the `volatile` in >>>>>> `map-indexed` >>>>>> and other transducers with unsynchronized mutable boxes. >>>>>> - Volatiles are good when all you want to do is set the value and >>>>>> have multiple threads always read the most up-to-date value, without >>>>>> having >>>>>> to depend on a previous value via e.g. `inc`. >>>>>> - Atomic boxes (`atom`, `AtomicLong`, etc.) are necessary when the >>>>>> mutable value relies on the previous value via e.g. `inc`, as is the >>>>>> case >>>>>> with `map-indexed`. >>>>>> >>>>>> My guess is that all this applies to e.g. the unsynchronized >>>>>> `ArrayList` in `partition-by` as well, which might need to be a >>>>>> synchronized collection or an immutable one boxed in an atom, but I >>>>>> haven't >>>>>> tested this. >>>>>> >>>>>> Would you agree with these conclusions, Seth and Timothy? >>>>>> >>>>>> On Sunday, April 9, 2017 at 1:56:38 PM UTC-4, Seth Verrinder wrote: >>>>>>> >>>>>>> I'll defer to Timothy on the particulars of core.async but it looks >>>>>>> like [1] the transducer in channel is protected by a lock. If that's >>>>>>> the >>>>>>> case volatile isn't adding anything in terms memory barriers. >>>>>>> >>>>>>> 1: >>>>>>> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L71 >>>>>>> >>>>>>> On Sunday, April 9, 2017 at 11:58:00 AM UTC-5, Alexander Gunnarson >>>>>>> wrote: >>>>>>>> >>>>>>>> Thanks so much for your well-considered reply, Timothy! That makes >>>>>>>> sense about volatiles being used in e.g. core.async or core.reducers >>>>>>>> contexts where the reducing function that closes over the mutable >>>>>>>> value of >>>>>>>> the stateful transducer is called in different threads. Why, then, are >>>>>>>> unsynchronized ArrayLists used e.g. in 'partition-by'? It's also >>>>>>>> closed >>>>>>>> over by the reducing function in just the same way as the volatile >>>>>>>> long >>>>>>>> value internal to e.g. 'map-indexed'. I'm not yet clear on how one >>>>>>>> (the >>>>>>>> ArrayList) is acceptable being non-volatile and the other (the >>>>>>>> volatile >>>>>>>> long) is unacceptable. When .add is called, an unsynchronized mutable >>>>>>>> counter is updated so the ArrayList can insert the next value at the >>>>>>>> correct index. Do you have any insight into this? Meanwhile I'll go do >>>>>>>> some >>>>>>>> digging myself on the Clojure JIRA etc. so I'm more informed on the >>>>>>>> subject. >>>>>>> >>>>>>> -- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Clojure" group. >>>>> To post to this group, send email to clo...@googlegroups.com >>>>> Note that posts from new members are moderated - please be patient >>>>> with your first post. >>>>> To unsubscribe from this group, send email to >>>>> clojure+u...@googlegroups.com >>>>> For more options, visit this group at >>>>> http://groups.google.com/group/clojure?hl=en >>>>> --- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Clojure" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to clojure+u...@googlegroups.com. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> >>>> >>>> >>>> -- >>>> “One of the main causes of the fall of the Roman Empire was >>>> that–lacking zero–they had no way to indicate successful termination of >>>> their C programs.” >>>> (Robert Firth) >>>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "Clojure" group. >>> To post to this group, send email to clo...@googlegroups.com >>> Note that posts from new members are moderated - please be patient with >>> your first post. >>> To unsubscribe from this group, send email to >>> clojure+u...@googlegroups.com >>> For more options, visit this group at >>> http://groups.google.com/group/clojure?hl=en >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Clojure" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to clojure+u...@googlegroups.com. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> >> >> -- >> “One of the main causes of the fall of the Roman Empire was that–lacking >> zero–they had no way to indicate successful termination of their C >> programs.” >> (Robert Firth) >> > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.