I'm leaning towards option 1. Are you thinking about also allowing the consumer to modify the entry, like JCache's EntryProcessors? For a consumer that can only modify the current entry, we could even "emulate" locking in an optimistic cache by catching the WriteSkewException and running the consumer again.
I wouldn't allow this to be mixed with other operations in a stream, because then you may have to run filters/mappers/sorting while holding the lock as well. Cheers Dan On Tue, Mar 21, 2017 at 5:37 PM, William Burns <mudokon...@gmail.com> wrote: > Some users have expressed the need to have some sort of forEach operation > that is performed where the Consumer is called while holding the lock for > the given key and subsequently released after the Consumer operation > completes. > > Due to the nature of how streams work with retries and performing the > operation on the primary owner, this works out quite well with forEach to be > done in an efficient way. > > The problem is that this only really works well with non tx and pessimistic > tx. This obviously leaves out optimistic tx, which at first I was a little > worried about. But after thinking about it more, this prelocking and > optimistic tx don't really fit that well together anyways. So I am thinking > whenever this operation is performed it would throw an exception not letting > the user use this feature in optimistic transactions. > > Another question is what does the API for this look like. I was debating > between 3 options myself: > > 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> > consumer) > > This require the least amount of changes, however the user can't customize > certain parameters that CacheStream currently provides (listed below - big > one being filterKeys). > > 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer) > > This method would only be allowed to be invoked on the Stream if no other > intermediate operations were invoked, otherwise an exception would be > thrown. This still gives us access to all of the CacheStream methods that > aren't on the Stream interface (ie. sequentialDistribution, > parallelDistribution, parallel, sequential, filterKeys, filterKeySegments, > distributedBatchSize, disableRehashAware, timeout). > > 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream() > > This requires the most changes, however the API would be the most explicit. > In this case the LockedStream would only have the methods on it that are > able to be invoked as noted above and forEach. > > I personally feel that #3 might be the cleanest, but obviously requires > adding more classes. Let me know what you guys think and if you think the > optimistic exclusion is acceptable. > > Thanks, > > - Will > > _______________________________________________ > infinispan-dev mailing list > infinispan-dev@lists.jboss.org > https://lists.jboss.org/mailman/listinfo/infinispan-dev _______________________________________________ infinispan-dev mailing list infinispan-dev@lists.jboss.org https://lists.jboss.org/mailman/listinfo/infinispan-dev