Repository: groovy Updated Branches: refs/heads/master bf1dc7b1b -> 8e6dd80e1
Improve the performance of `ConcurrentCommonCache`(3.0.0+ Only) Project: http://git-wip-us.apache.org/repos/asf/groovy/repo Commit: http://git-wip-us.apache.org/repos/asf/groovy/commit/8e6dd80e Tree: http://git-wip-us.apache.org/repos/asf/groovy/tree/8e6dd80e Diff: http://git-wip-us.apache.org/repos/asf/groovy/diff/8e6dd80e Branch: refs/heads/master Commit: 8e6dd80e1114b66e1ca482e5ddd436733e532fc2 Parents: bf1dc7b Author: danielsun1106 <[email protected]> Authored: Sun Mar 4 01:56:02 2018 +0800 Committer: danielsun1106 <[email protected]> Committed: Sun Mar 4 01:56:02 2018 +0800 ---------------------------------------------------------------------- .../runtime/memoize/ConcurrentCommonCache.java | 65 ++++++++++++-------- .../memoize/ConcurrentCommonCacheTest.java | 38 ++++++++++++ 2 files changed, 77 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/groovy/blob/8e6dd80e/src/main/java/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCache.java b/src/main/java/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCache.java index 27ee102..a758eb2 100644 --- a/src/main/java/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCache.java +++ b/src/main/java/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCache.java @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.Map; import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.StampedLock; /** * Represents a simple key-value cache, which is thread safe and backed by a {@link java.util.Map} instance @@ -36,9 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class ConcurrentCommonCache<K, V> implements EvictableCache<K, V>, ValueConvertable<V, Object>, Serializable { private static final long serialVersionUID = -7352338549333024936L; - private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); - private final ReentrantReadWriteLock.ReadLock readLock = rwl.readLock(); - private final ReentrantReadWriteLock.WriteLock writeLock = rwl.writeLock(); + private final StampedLock sl = new StampedLock(); private final CommonCache<K, V> commonCache; /** @@ -116,35 +114,43 @@ public class ConcurrentCommonCache<K, V> implements EvictableCache<K, V>, ValueC public V getAndPut(K key, ValueProvider<? super K, ? extends V> valueProvider, boolean shouldCache) { V value; - readLock.lock(); + long stamp = sl.readLock(); try { value = commonCache.get(key); if (null != convertValue(value)) { return value; } - } finally { - readLock.unlock(); - } - writeLock.lock(); - try { - // try to find the cached value again - value = commonCache.get(key); - if (null != convertValue(value)) { - return value; - } + long ws = sl.tryConvertToWriteLock(stamp); + if (0L == ws) { // Failed to convert read lock to write lock + sl.unlockRead(stamp); + stamp = sl.writeLock(); - value = null == valueProvider ? null : valueProvider.provide(key); - if (shouldCache && null != convertValue(value)) { - commonCache.put(key, value); + // try to find the cached value again + value = commonCache.get(key); + if (null != convertValue(value)) { + return value; + } + } else { + stamp = ws; } + + value = compute(key, valueProvider, shouldCache); } finally { - writeLock.unlock(); + sl.unlock(stamp); } return value; } + private V compute(K key, ValueProvider<? super K, ? extends V> valueProvider, boolean shouldCache) { + V value = null == valueProvider ? null : valueProvider.provide(key); + if (shouldCache && null != convertValue(value)) { + commonCache.put(key, value); + } + return value; + } + /** * {@inheritDoc} */ @@ -217,11 +223,11 @@ public class ConcurrentCommonCache<K, V> implements EvictableCache<K, V>, ValueC * @param action the content to complete */ public <R> R doWithWriteLock(Action<K, V, R> action) { - writeLock.lock(); + long stamp = sl.writeLock(); try { return action.doWith(commonCache); } finally { - writeLock.unlock(); + sl.unlockWrite(stamp); } } @@ -230,12 +236,19 @@ public class ConcurrentCommonCache<K, V> implements EvictableCache<K, V>, ValueC * @param action the content to complete */ public <R> R doWithReadLock(Action<K, V, R> action) { - readLock.lock(); - try { - return action.doWith(commonCache); - } finally { - readLock.unlock(); + long stamp = sl.tryOptimisticRead(); + R result = action.doWith(commonCache); + + if (!sl.validate(stamp)) { + stamp = sl.readLock(); + try { + result = action.doWith(commonCache); + } finally { + sl.unlockRead(stamp); + } } + + return result; } @FunctionalInterface http://git-wip-us.apache.org/repos/asf/groovy/blob/8e6dd80e/src/test/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCacheTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCacheTest.java b/src/test/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCacheTest.java index 44c9ae2..d012e99 100644 --- a/src/test/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCacheTest.java +++ b/src/test/org/codehaus/groovy/runtime/memoize/ConcurrentCommonCacheTest.java @@ -19,11 +19,19 @@ package org.codehaus.groovy.runtime.memoize; import org.apache.groovy.util.Maps; +import org.apache.groovy.util.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; public class ConcurrentCommonCacheTest { @Test @@ -200,4 +208,34 @@ public class ConcurrentCommonCacheTest { Assert.assertEquals("3", sc.get("c")); Assert.assertEquals("5", sc.get("d")); } + + @Test + public void testAccessCacheConcurrently() throws InterruptedException { + final ConcurrentCommonCache<Integer, Integer> m = new ConcurrentCommonCache<>(); + + final int threadNum = 60; + final CountDownLatch countDownLatch = new CountDownLatch(1); + final CountDownLatch countDownLatch2 = new CountDownLatch(threadNum); + + final AtomicInteger cnt = new AtomicInteger(0); + + for (int i = 0; i < threadNum; i++) { + new Thread(() -> { + try { + countDownLatch.await(); + + m.getAndPut(123, k -> cnt.getAndIncrement()); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + countDownLatch2.countDown(); + } + }).start(); + } + + countDownLatch.countDown(); + countDownLatch2.await(); + + Assert.assertEquals(1, cnt.get()); + } }
