HBASE-17747 Support both weak and soft object pool
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44b25588 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44b25588 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44b25588 Branch: refs/heads/hbase-12439 Commit: 44b255889cfb168aaac8adc162f740beb61a7221 Parents: 201c838 Author: Yu Li <[email protected]> Authored: Tue Mar 14 11:07:52 2017 +0800 Committer: Yu Li <[email protected]> Committed: Tue Mar 14 11:07:52 2017 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/util/KeyLocker.java | 2 +- .../apache/hadoop/hbase/util/ObjectPool.java | 174 +++++++++++++++++++ .../hadoop/hbase/util/SoftObjectPool.java | 81 +++++++++ .../hadoop/hbase/util/WeakObjectPool.java | 151 ++-------------- .../hadoop/hbase/util/TestWeakObjectPool.java | 4 +- .../hadoop/hbase/util/IdReadWriteLock.java | 9 +- .../hadoop/hbase/util/TestIdReadWriteLock.java | 5 +- 7 files changed, 285 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java index 6acf584..57e7bb0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java @@ -50,7 +50,7 @@ public class KeyLocker<K> { private final WeakObjectPool<K, ReentrantLock> lockPool = new WeakObjectPool<>( - new WeakObjectPool.ObjectFactory<K, ReentrantLock>() { + new ObjectPool.ObjectFactory<K, ReentrantLock>() { @Override public ReentrantLock createObject(K key) { return new ReentrantLock(); http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java new file mode 100644 index 0000000..f736922 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A thread-safe shared object pool in which object creation is expected to be lightweight, and the + * objects may be excessively created and discarded. + */ [email protected] +public abstract class ObjectPool<K, V> { + /** + * An {@code ObjectFactory} object is used to create + * new shared objects on demand. + */ + public interface ObjectFactory<K, V> { + /** + * Creates a new shared object associated with the given {@code key}, + * identified by the {@code equals} method. + * This method may be simultaneously called by multiple threads + * with the same key, and the excessive objects are just discarded. + */ + V createObject(K key); + } + + protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>(); + + private final ObjectFactory<K, V> objectFactory; + + /** Does not permit null keys. */ + protected final ConcurrentMap<K, Reference<V>> referenceCache; + + /** + * The default initial capacity, + * used when not otherwise specified in a constructor. + */ + public static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The default concurrency level, + * used when not otherwise specified in a constructor. + */ + public static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * Creates a new pool with the default initial capacity (16) + * and the default concurrency level (16). + * + * @param objectFactory the factory to supply new objects on demand + * + * @throws NullPointerException if {@code objectFactory} is null + */ + public ObjectPool(ObjectFactory<K, V> objectFactory) { + this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new pool with the given initial capacity + * and the default concurrency level (16). + * + * @param objectFactory the factory to supply new objects on demand + * @param initialCapacity the initial capacity to keep objects in the pool + * + * @throws NullPointerException if {@code objectFactory} is null + * @throws IllegalArgumentException if {@code initialCapacity} is negative + */ + public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) { + this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new pool with the given initial capacity + * and the given concurrency level. + * + * @param objectFactory the factory to supply new objects on demand + * @param initialCapacity the initial capacity to keep objects in the pool + * @param concurrencyLevel the estimated count of concurrently accessing threads + * + * @throws NullPointerException if {@code objectFactory} is null + * @throws IllegalArgumentException if {@code initialCapacity} is negative or + * {@code concurrencyLevel} is non-positive + */ + public ObjectPool( + ObjectFactory<K, V> objectFactory, + int initialCapacity, + int concurrencyLevel) { + + if (objectFactory == null) { + throw new NullPointerException("Given object factory instance is NULL"); + } + this.objectFactory = objectFactory; + + this.referenceCache = + new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel); + } + + /** + * Removes stale references of shared objects from the pool. + * References newly becoming stale may still remain. + * The implementation of this method is expected to be lightweight + * when there is no stale reference. + */ + public abstract void purge(); + + /** + * Create a reference associated with the given object + * @param key the key to store in the reference + * @param obj the object to associate with + * @return the reference instance + */ + public abstract Reference<V> createReference(K key, V obj); + + /** + * Returns a shared object associated with the given {@code key}, + * which is identified by the {@code equals} method. + * @throws NullPointerException if {@code key} is null + */ + public V get(K key) { + Reference<V> ref = referenceCache.get(key); + if (ref != null) { + V obj = ref.get(); + if (obj != null) { + return obj; + } + referenceCache.remove(key, ref); + } + + V newObj = objectFactory.createObject(key); + Reference<V> newRef = createReference(key, newObj); + while (true) { + Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef); + if (existingRef == null) { + return newObj; + } + + V existingObject = existingRef.get(); + if (existingObject != null) { + return existingObject; + } + referenceCache.remove(key, existingRef); + } + } + + /** + * Returns an estimated count of objects kept in the pool. + * This also counts stale references, + * and you might want to call {@link #purge()} beforehand. + */ + public int size() { + return referenceCache.size(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java new file mode 100644 index 0000000..7f27f98 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.lang.ref.Reference; +import java.lang.ref.SoftReference; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory; + +/** + * A {@code SoftReference} based shared object pool. + * The objects are kept in soft references and + * associated with keys which are identified by the {@code equals} method. + * The objects are created by {@link ObjectFactory} on demand. + * The object creation is expected to be lightweight, + * and the objects may be excessively created and discarded. + * Thread safe. + */ [email protected] +public class SoftObjectPool<K, V> extends ObjectPool<K, V> { + + public SoftObjectPool(ObjectFactory<K, V> objectFactory) { + super(objectFactory); + } + + public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) { + super(objectFactory, initialCapacity); + } + + public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity, + int concurrencyLevel) { + super(objectFactory, initialCapacity, concurrencyLevel); + } + + @Override + public void purge() { + // This method is lightweight while there is no stale reference + // with the Oracle (Sun) implementation of {@code ReferenceQueue}, + // because {@code ReferenceQueue.poll} just checks a volatile instance + // variable in {@code ReferenceQueue}. + while (true) { + @SuppressWarnings("unchecked") + SoftObjectReference ref = (SoftObjectReference) staleRefQueue.poll(); + if (ref == null) { + break; + } + referenceCache.remove(ref.key, ref); + } + } + + @Override + public Reference<V> createReference(K key, V obj) { + return new SoftObjectReference(key, obj); + } + + private class SoftObjectReference extends SoftReference<V> { + final K key; + + SoftObjectReference(K key, V obj) { + super(obj, staleRefQueue); + this.key = key; + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java index 478864b..8529f01 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,15 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.util; -import java.lang.ref.ReferenceQueue; +import java.lang.ref.Reference; import java.lang.ref.WeakReference; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory; /** * A {@code WeakReference} based shared object pool. @@ -35,116 +33,30 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * Thread safe. */ @InterfaceAudience.Private -public class WeakObjectPool<K, V> { - /** - * An {@code ObjectFactory} object is used to create - * new shared objects on demand. - */ - public interface ObjectFactory<K, V> { - /** - * Creates a new shared object associated with the given {@code key}, - * identified by the {@code equals} method. - * This method may be simultaneously called by multiple threads - * with the same key, and the excessive objects are just discarded. - */ - V createObject(K key); - } - - private final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>(); - - private class ObjectReference extends WeakReference<V> { - final K key; - - ObjectReference(K key, V obj) { - super(obj, staleRefQueue); - this.key = key; - } - } - - private final ObjectFactory<K, V> objectFactory; - - /** Does not permit null keys. */ - private final ConcurrentMap<K, ObjectReference> referenceCache; - - /** - * The default initial capacity, - * used when not otherwise specified in a constructor. - */ - public static final int DEFAULT_INITIAL_CAPACITY = 16; - - /** - * The default concurrency level, - * used when not otherwise specified in a constructor. - */ - public static final int DEFAULT_CONCURRENCY_LEVEL = 16; +public class WeakObjectPool<K,V> extends ObjectPool<K,V> { - /** - * Creates a new pool with the default initial capacity (16) - * and the default concurrency level (16). - * - * @param objectFactory the factory to supply new objects on demand - * - * @throws NullPointerException if {@code objectFactory} is null - */ public WeakObjectPool(ObjectFactory<K, V> objectFactory) { - this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL); + super(objectFactory); } - /** - * Creates a new pool with the given initial capacity - * and the default concurrency level (16). - * - * @param objectFactory the factory to supply new objects on demand - * @param initialCapacity the initial capacity to keep objects in the pool - * - * @throws NullPointerException if {@code objectFactory} is null - * @throws IllegalArgumentException if {@code initialCapacity} is negative - */ public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) { - this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL); + super(objectFactory, initialCapacity); } - /** - * Creates a new pool with the given initial capacity - * and the given concurrency level. - * - * @param objectFactory the factory to supply new objects on demand - * @param initialCapacity the initial capacity to keep objects in the pool - * @param concurrencyLevel the estimated count of concurrently accessing threads - * - * @throws NullPointerException if {@code objectFactory} is null - * @throws IllegalArgumentException if {@code initialCapacity} is negative or - * {@code concurrencyLevel} is non-positive - */ - public WeakObjectPool( - ObjectFactory<K, V> objectFactory, - int initialCapacity, + public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity, int concurrencyLevel) { - - if (objectFactory == null) { - throw new NullPointerException(); - } - this.objectFactory = objectFactory; - - this.referenceCache = new ConcurrentHashMap<>(initialCapacity, 0.75f, concurrencyLevel); - // 0.75f is the default load factor threshold of ConcurrentHashMap. + super(objectFactory, initialCapacity, concurrencyLevel); } - /** - * Removes stale references of shared objects from the pool. - * References newly becoming stale may still remain. - * The implementation of this method is expected to be lightweight - * when there is no stale reference. - */ + @Override public void purge() { // This method is lightweight while there is no stale reference // with the Oracle (Sun) implementation of {@code ReferenceQueue}, // because {@code ReferenceQueue.poll} just checks a volatile instance // variable in {@code ReferenceQueue}. - while (true) { @SuppressWarnings("unchecked") - ObjectReference ref = (ObjectReference)staleRefQueue.poll(); + WeakObjectReference ref = (WeakObjectReference) staleRefQueue.poll(); if (ref == null) { break; } @@ -152,43 +64,18 @@ public class WeakObjectPool<K, V> { } } - /** - * Returns a shared object associated with the given {@code key}, - * which is identified by the {@code equals} method. - * @throws NullPointerException if {@code key} is null - */ - public V get(K key) { - ObjectReference ref = referenceCache.get(key); - if (ref != null) { - V obj = ref.get(); - if (obj != null) { - return obj; - } - referenceCache.remove(key, ref); - } + @Override + public Reference<V> createReference(K key, V obj) { + return new WeakObjectReference(key, obj); + } - V newObj = objectFactory.createObject(key); - ObjectReference newRef = new ObjectReference(key, newObj); - while (true) { - ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef); - if (existingRef == null) { - return newObj; - } + private class WeakObjectReference extends WeakReference<V> { + final K key; - V existingObject = existingRef.get(); - if (existingObject != null) { - return existingObject; - } - referenceCache.remove(key, existingRef); + WeakObjectReference(K key, V obj) { + super(obj, staleRefQueue); + this.key = key; } } - /** - * Returns an estimated count of objects kept in the pool. - * This also counts stale references, - * and you might want to call {@link #purge()} beforehand. - */ - public int size() { - return referenceCache.size(); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java index d9fefa2..9dbbbd0 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java @@ -31,12 +31,12 @@ import org.junit.experimental.categories.Category; @Category({MiscTests.class, SmallTests.class}) public class TestWeakObjectPool { - WeakObjectPool<String, Object> pool; + ObjectPool<String, Object> pool; @Before public void setUp() { pool = new WeakObjectPool<>( - new WeakObjectPool.ObjectFactory<String, Object>() { + new ObjectPool.ObjectFactory<String, Object>() { @Override public Object createObject(String key) { return new Object(); http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java index caf3265..deb2265 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java @@ -44,10 +44,11 @@ import com.google.common.annotations.VisibleForTesting; public class IdReadWriteLock { // The number of lock we want to easily support. It's not a maximum. private static final int NB_CONCURRENT_LOCKS = 1000; - // The pool to get entry from, entries are mapped by weak reference to make it able to be - // garbage-collected asap - private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool = new WeakObjectPool<>( - new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() { + // The pool to get entry from, entries are mapped by soft reference and will be + // automatically garbage-collected when JVM memory pressure is high + private final ObjectPool<Long, ReentrantReadWriteLock> lockPool = + new SoftObjectPool<>( + new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() { @Override public ReentrantReadWriteLock createObject(Long id) { return new ReentrantReadWriteLock(); http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java index 2ccfad8..295816f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java @@ -111,10 +111,11 @@ public class TestIdReadWriteLock { Future<Boolean> result = ecs.take(); assertTrue(result.get()); } - // make sure the entry pool will be cleared after GC and purge call + // make sure the entry pool won't be cleared when JVM memory is enough + // even after GC and purge call int entryPoolSize = idLock.purgeAndGetEntryPoolSize(); LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize); - assertEquals(0, entryPoolSize); + assertEquals(NUM_IDS, entryPoolSize); } finally { exec.shutdown(); exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
