Repository: hbase Updated Branches: refs/heads/master ebe92c8fb -> d047cc9ec
HBASE-18085 Prevent parallel purge in ObjectPool Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d047cc9e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d047cc9e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d047cc9e Branch: refs/heads/master Commit: d047cc9ecc7e0067c3b3aaeeb1600da8d774b211 Parents: ebe92c8 Author: Yu Li <[email protected]> Authored: Wed May 24 15:42:39 2017 +0800 Committer: Yu Li <[email protected]> Committed: Wed May 24 15:42:39 2017 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/util/ObjectPool.java | 39 +++++++++++++++++--- .../hadoop/hbase/util/SoftObjectPool.java | 21 +++-------- .../hadoop/hbase/util/WeakObjectPool.java | 21 +++-------- 3 files changed, 44 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d047cc9e/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java index f736922..6a04812 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java @@ -22,6 +22,8 @@ import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -52,6 +54,9 @@ public abstract class ObjectPool<K, V> { /** Does not permit null keys. */ protected final ConcurrentMap<K, Reference<V>> referenceCache; + /** For preventing parallel purge */ + private final Lock purgeLock = new ReentrantLock(); + /** * The default initial capacity, * used when not otherwise specified in a constructor. @@ -117,12 +122,29 @@ public abstract class ObjectPool<K, V> { } /** - * Removes stale references of shared objects from the pool. - * References newly becoming stale may still remain. - * The implementation of this method is expected to be lightweight - * when there is no stale reference. + * Removes stale references of shared objects from the pool. References newly becoming stale may + * still remain. + * <p/> + * The implementation of this method is expected to be lightweight when there is no stale + * reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because + * {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}. */ - public abstract void purge(); + public void purge() { + if (purgeLock.tryLock()) {// no parallel purge + try { + while (true) { + @SuppressWarnings("unchecked") + Reference<V> ref = (Reference<V>) staleRefQueue.poll(); + if (ref == null) { + break; + } + referenceCache.remove(getReferenceKey(ref), ref); + } + } finally { + purgeLock.unlock(); + } + } + } /** * Create a reference associated with the given object @@ -133,6 +155,13 @@ public abstract class ObjectPool<K, V> { public abstract Reference<V> createReference(K key, V obj); /** + * Get key of the given reference + * @param ref The reference + * @return key of the reference + */ + public abstract K getReferenceKey(Reference<V> ref); + + /** * Returns a shared object associated with the given {@code key}, * which is identified by the {@code equals} method. * @throws NullPointerException if {@code key} is null http://git-wip-us.apache.org/repos/asf/hbase/blob/d047cc9e/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java index 7f27f98..19c65b2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java @@ -49,22 +49,6 @@ public class SoftObjectPool<K, V> extends ObjectPool<K, V> { } @Override - public void purge() { - // This method is lightweight while there is no stale reference - // with the Oracle (Sun) implementation of {@code ReferenceQueue}, - // because {@code ReferenceQueue.poll} just checks a volatile instance - // variable in {@code ReferenceQueue}. - while (true) { - @SuppressWarnings("unchecked") - SoftObjectReference ref = (SoftObjectReference) staleRefQueue.poll(); - if (ref == null) { - break; - } - referenceCache.remove(ref.key, ref); - } - } - - @Override public Reference<V> createReference(K key, V obj) { return new SoftObjectReference(key, obj); } @@ -78,4 +62,9 @@ public class SoftObjectPool<K, V> extends ObjectPool<K, V> { } } + @Override + public K getReferenceKey(Reference<V> ref) { + return ((SoftObjectReference) ref).key; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/d047cc9e/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java index 8529f01..6714034 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java @@ -49,22 +49,6 @@ public class WeakObjectPool<K,V> extends ObjectPool<K,V> { } @Override - public void purge() { - // This method is lightweight while there is no stale reference - // with the Oracle (Sun) implementation of {@code ReferenceQueue}, - // because {@code ReferenceQueue.poll} just checks a volatile instance - // variable in {@code ReferenceQueue}. - while (true) { - @SuppressWarnings("unchecked") - WeakObjectReference ref = (WeakObjectReference) staleRefQueue.poll(); - if (ref == null) { - break; - } - referenceCache.remove(ref.key, ref); - } - } - - @Override public Reference<V> createReference(K key, V obj) { return new WeakObjectReference(key, obj); } @@ -78,4 +62,9 @@ public class WeakObjectPool<K,V> extends ObjectPool<K,V> { } } + @Override + public K getReferenceKey(Reference<V> ref) { + return ((WeakObjectReference)ref).key; + } + }
