I do have one last question for you on the topic. I'm thinking of using the 
below function within a core.async context — specifically within a `go` 
block:

(defn reduce-indexed [f init xs]
  (reduce (let [i (mutable-long -1)]
            (fn [ret x] (f ret (mutable-swap! i inc) x))) 
          init xs))

Are you saying that *any* unsynchronized, non-volatile mutable operation is 
unsafe within a core.async context, or specifically within transducers when 
applied to e.g. `chan`? That is, when are thread context switches possible 
— specifically within `go` blocks where the blocking takes and puts are 
transformed into asynchronous takes and puts participating in a state 
machine, like so?:

(go (let [x (unsynchronized-mutable 0)]
      (println "thread 1" (Thread/currentThread))
      (reset! x 1)
      (<! c) ; defined elsewhere
      (println "possibly not thread 1" (Thread/currentThread))
      (println "x =" @x))) ; possibly cached in memory and may thus still 
read as x=0, so maybe x should be `volatile!`

Thanks!

On Sunday, April 9, 2017 at 9:39:35 PM UTC-4, Alexander Gunnarson wrote:
>
> You make a very good point. I had been under the misimpression that you 
> could make an `r/folder` out of any thread-safe transducer like so, and it 
> would work out of the box:
>
> (defn broken-reducers-map-indexed [f coll]
>   (r/folder coll (map-indexed-transducer-concurrently-multi-threaded f)))
>
> and then you could use it like so:
>
> (->> (range 10 20) vec (broken-reducers-map-indexed vector) (r/fold ...))
>
> However, while the indices *do* all appear (unlike in the case of the 
> `volatile`-using transducer), they are out of order, unlike the (indexed) 
> elements of the original range which do not rely on a stateful transducer 
> to keep track of the current index. So you're right — a subtler 
> implementation is required here that sadly isn't as simple as I had thought 
> (just reusing transducers).
>
> You could wrap stateful transducers in an `r/reducer` for use with the 
> threading macro, but they wouldn't be foldable:
>
> (defn non-foldable-reducers-map-indexed [f coll]
>   (r/reducer coll (core/map-indexed f)))
>
> (->> (range 10 20) vec (non-foldable-reducers-map-indexed vector) (fold 
> ...)) ; won't employ parallelism
>
> That said, it seems to me that you *can* use stateless transducers like 
> `map` in any context (single-threaded, sequentially multi-threaded, or 
> concurrent) and get consistent results:
>
> (defn reducers-map-implemented-with-transducer [f coll]
>   (r/folder coll (core/map f)))
>
> I guess the pipe dream of writing any transducer, stateful or not, and 
> getting a parallel-ready transformation out of it by wrapping it in an 
> `r/folder` is gone. If I remember correctly, the `tesser` library won't 
> help here. I might end up coding up something to ameliorate the situation 
> because I was planning on being able to just do e.g.
>
> (->> (range 10 20) 
>      (r/map ...) 
>      (reducers-map-indexed vector) 
>      ... 
>      (fold ...))
>
> Anyway, thanks so much for your insights! I appreciate you taking the time 
> to share them!
>
> On Sunday, April 9, 2017 at 7:16:18 PM UTC-4, tbc++ wrote:
>>
>> In your example transducer, the problem is with the `result` parameter. 
>> The specification of transducers is that the result of `(rf result x)` 
>> should be fed into the next call to `rf`. In other words: (-> result (rf 
>> x1) (rf x2) (rf x3))` trying to do that in a parallel context is next to 
>> impossible. Not saying there isn't a way to code a transducer-like thing to 
>> work with multiple threads, but the result of that would look a lot more 
>> like core.async or Reactive Extensions, than the transducers we have today. 
>>
>> On Sun, Apr 9, 2017 at 4:57 PM, Alexander Gunnarson <
>> alexander...@gmail.com> wrote:
>>
>>> That makes sense about them not being designed for that use case. I 
>>> would add, though, that transducers could certainly be used in a parallel 
>>> context *if* the current transducer implementations were abstracted such 
>>> that you could pass internal state generator and modifier functions and use 
>>> the correct ones in whichever context is appropriate (single-threaded 
>>> read/write, sequentially multi-threaded read/write à la core.async, 
>>> concurrently multi-threaded read/write à la core.reducers). In the case of 
>>> `map-indexed`, the fact that its transducer uses a volatile as currently 
>>> implemented is not part of the `map-indexed` "contract", if you will, and 
>>> seems to me to be an implementation detail. One could just as easily write 
>>> the transducer for `map-indexed` as below:
>>>
>>> (defn map-indexed-transducer-base [f box-mutable inc-mutable]
>>>   (fn [rf]
>>>     (let [i (box-mutable -1)]
>>>       (fn
>>>         ([] (rf))
>>>         ([result] (rf result))
>>>         ([result input]
>>>           (rf result (f (inc-mutable i) input)))))))
>>>
>>> (defn map-indexed-transducer-single-threaded [f]
>>>   (map-indexed-transducer-base f unsynchronized-mutable-long! 
>>> #(unsynchronized-mutable-swap! 
>>> % inc))
>>>
>>> (defn map-indexed-transducer-sequentially-multi-threaded [f]
>>>   (map-indexed-transducer-base f volatile! #(vswap! % inc))
>>>
>>> (defn map-indexed-transducer-concurrently-multi-threaded [f]
>>>   (map-indexed-transducer-base f atom #(swap! % inc)) ; or an 
>>> AtomicLong variant
>>>
>>>
>>> On Sunday, April 9, 2017 at 6:47:46 PM UTC-4, tbc++ wrote:
>>>>
>>>> Transducers were never designed to work in parallel context. So I'd 
>>>> define any behavior that arises from using the same transducers in 
>>>> multiple 
>>>> threads *at the same time*, as undefined behavior. 
>>>>
>>>> On Sun, Apr 9, 2017 at 4:39 PM, Alexander Gunnarson <
>>>> alexander...@gmail.com> wrote:
>>>>
>>>>> I should add that, as Timothy pointed out, if multiple threads mutate 
>>>>> and read the value but only one ever does so at a time, as is the case in 
>>>>> `core.async`, then a volatile is sufficient. My preliminary conclusions 
>>>>> above about volatiles apply only to concurrent mutation via e.g. `fold` 
>>>>> or 
>>>>> the like.
>>>>>
>>>>> Also, regarding the locks you mentioned, Seth, I read up a little on 
>>>>> the Java memory model here 
>>>>> <http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#synchronization>
>>>>>  
>>>>> and I can confirm that a lock is sufficient to provide *both* write *and* 
>>>>> read thread-safety guarantees: 
>>>>>
>>>>> ... acquir[ing a] monitor ... has the effect of invalidating the local 
>>>>>> processor cache so that variables will be reloaded from main memory. We 
>>>>>> will then be able to see all of the writes made visible by the previous 
>>>>>> release.
>>>>>>
>>>>>
>>>>> `Volatile` only provides a subset of these read-safety guarantees, so 
>>>>> a `volatile` in addition to a lock is indeed overkill, if that's what is 
>>>>> happening.
>>>>>
>>>>> On Sunday, April 9, 2017 at 6:19:51 PM UTC-4, Alexander Gunnarson 
>>>>> wrote:
>>>>>>
>>>>>> It looks that way to me too, Seth, though I'd have to comb over the 
>>>>>> details of the locks implemented there to give a reasoned opinion of my 
>>>>>> own. But yes, if that's the case, the volatile isn't adding anything.
>>>>>>
>>>>>> Anyway, I'm not trying to poke holes in the current implementation of 
>>>>>> transducers — on the contrary, I'm very appreciative of and impressed by 
>>>>>> the efforts the clojure.core (and core.async) contributors have made on 
>>>>>> that and other fronts. Transducers are an extremely powerful and elegant 
>>>>>> way to express code that would otherwise be a lot more complex and 
>>>>>> difficult to reason about. I'm just trying to figure out where I can get 
>>>>>> away with having unsynchronized mutable versions of stateful transducers 
>>>>>> that currently use volatiles, and where I need even stronger measures of 
>>>>>> thread safety than volatiles.
>>>>>>
>>>>>> To take these thoughts further, I did a simple test to compare the 
>>>>>> three types of mutability we've been talking about (unsynchronized, 
>>>>>> volatile, and atomic — I can reproduce the code here if you'd like) and 
>>>>>> the 
>>>>>> takeaway is that `map-indexed` really does rely on atomic operations in 
>>>>>> a 
>>>>>> multithreaded context, as each index depends on the previous index 
>>>>>> value. 
>>>>>> When doing a `volatile`-based `map-indexed` in parallel on a small 
>>>>>> collection (8 elements), the `volatile` value stays consistent — that 
>>>>>> is, 
>>>>>> all the correct indices are passed to the mapping function. However, 
>>>>>> over a 
>>>>>> sufficiently large collection (100 elements, though it could happen on 
>>>>>> smaller scales too), the `volatile` value starts to break down: 
>>>>>> duplicate 
>>>>>> index values are passed to the mapping function and the highest index 
>>>>>> value 
>>>>>> only ever reaches 97 at the maximum. The same phenomenon happens, of 
>>>>>> course, with the unsynchronized-mutable-box-based `map-indexed`, though 
>>>>>> it 
>>>>>> happens at a small scale too (calling the unsynchronized `map-indexed` 
>>>>>> on 8 
>>>>>> elements operated on by 2 threads produces only 7 unique indices).
>>>>>>
>>>>>> My preliminary conclusions are:
>>>>>> - Unsynchronized mutability is fine in contexts known to be only 
>>>>>> single-threaded, in which I could replace the `volatile` in 
>>>>>> `map-indexed` 
>>>>>> and other transducers with unsynchronized mutable boxes.
>>>>>> - Volatiles are good when all you want to do is set the value and 
>>>>>> have multiple threads always read the most up-to-date value, without 
>>>>>> having 
>>>>>> to depend on a previous value via e.g. `inc`.
>>>>>> - Atomic boxes (`atom`, `AtomicLong`, etc.) are necessary when the 
>>>>>> mutable value relies on the previous value via e.g. `inc`, as is the 
>>>>>> case 
>>>>>> with `map-indexed`.
>>>>>>
>>>>>> My guess is that all this applies to e.g. the unsynchronized 
>>>>>> `ArrayList` in `partition-by` as well, which might need to be a 
>>>>>> synchronized collection or an immutable one boxed in an atom, but I 
>>>>>> haven't 
>>>>>> tested this.
>>>>>>
>>>>>> Would you agree with these conclusions, Seth and Timothy?
>>>>>>
>>>>>> On Sunday, April 9, 2017 at 1:56:38 PM UTC-4, Seth Verrinder wrote:
>>>>>>>
>>>>>>> I'll defer to Timothy on the particulars of core.async but it looks 
>>>>>>> like [1] the transducer in channel is protected by a lock. If that's 
>>>>>>> the 
>>>>>>> case volatile isn't adding anything in terms memory barriers.
>>>>>>>
>>>>>>> 1: 
>>>>>>> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L71
>>>>>>>
>>>>>>> On Sunday, April 9, 2017 at 11:58:00 AM UTC-5, Alexander Gunnarson 
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks so much for your well-considered reply, Timothy! That makes 
>>>>>>>> sense about volatiles being used in e.g. core.async or core.reducers 
>>>>>>>> contexts where the reducing function that closes over the mutable 
>>>>>>>> value of 
>>>>>>>> the stateful transducer is called in different threads. Why, then, are 
>>>>>>>> unsynchronized ArrayLists used e.g. in 'partition-by'? It's also 
>>>>>>>> closed 
>>>>>>>> over by the reducing function in just the same way as the volatile 
>>>>>>>> long 
>>>>>>>> value internal to e.g. 'map-indexed'. I'm not yet clear on how one 
>>>>>>>> (the 
>>>>>>>> ArrayList) is acceptable being non-volatile and the other (the 
>>>>>>>> volatile 
>>>>>>>> long) is unacceptable. When .add is called, an unsynchronized mutable 
>>>>>>>> counter is updated so the ArrayList can insert the next value at the 
>>>>>>>> correct index. Do you have any insight into this? Meanwhile I'll go do 
>>>>>>>> some 
>>>>>>>> digging myself on the Clojure JIRA etc. so I'm more informed on the 
>>>>>>>> subject. 
>>>>>>>
>>>>>>> -- 
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Clojure" group.
>>>>> To post to this group, send email to clo...@googlegroups.com
>>>>> Note that posts from new members are moderated - please be patient 
>>>>> with your first post.
>>>>> To unsubscribe from this group, send email to
>>>>> clojure+u...@googlegroups.com
>>>>> For more options, visit this group at
>>>>> http://groups.google.com/group/clojure?hl=en
>>>>> --- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Clojure" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to clojure+u...@googlegroups.com.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>>>
>>>> -- 
>>>> “One of the main causes of the fall of the Roman Empire was 
>>>> that–lacking zero–they had no way to indicate successful termination of 
>>>> their C programs.”
>>>> (Robert Firth) 
>>>>
>>> -- 
>>> You received this message because you are subscribed to the Google
>>> Groups "Clojure" group.
>>> To post to this group, send email to clo...@googlegroups.com
>>> Note that posts from new members are moderated - please be patient with 
>>> your first post.
>>> To unsubscribe from this group, send email to
>>> clojure+u...@googlegroups.com
>>> For more options, visit this group at
>>> http://groups.google.com/group/clojure?hl=en
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Clojure" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to clojure+u...@googlegroups.com.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> -- 
>> “One of the main causes of the fall of the Roman Empire was that–lacking 
>> zero–they had no way to indicate successful termination of their C 
>> programs.”
>> (Robert Firth) 
>>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to