Author: markt
Date: Fri May 13 19:05:20 2011
New Revision: 1102868
URL: http://svn.apache.org/viewvc?rev=1102868&view=rev
Log:
Add the ability to register an interest in a key to prevent the associated
ObjectDeque being removed from under methods that expect it to remain available
for the life of the method.
Fixes the issues found by commons-performance
Modified:
commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
Modified:
commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
URL:
http://svn.apache.org/viewvc/commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java?rev=1102868&r1=1102867&r2=1102868&view=diff
==============================================================================
---
commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
(original)
+++
commons/proper/pool/trunk/src/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java
Fri May 13 19:05:20 2011
@@ -28,6 +28,10 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.pool2.BaseKeyedObjectPool;
import org.apache.commons.pool2.KeyedPoolableObjectFactory;
@@ -1025,103 +1029,106 @@ public class GenericKeyedObjectPool<K,T>
long maxWait = _maxWait;
boolean create;
- ObjectDeque<T> objectDeque = poolMap.get(key);
+ ObjectDeque<T> objectDeque = register(key);
- while (p == null) {
- create = false;
- if (whenExhaustedAction == WhenExhaustedAction.FAIL) {
- if (objectDeque != null) {
- p = objectDeque.getIdleObjects().pollFirst();
- }
- if (p == null) {
- create = true;
- p = create(key, false);
- }
- if (p == null) {
- throw new NoSuchElementException("Pool exhausted");
- }
- if (!p.allocate()) {
- p = null;
- }
- } else if (whenExhaustedAction == WhenExhaustedAction.BLOCK) {
- if (objectDeque != null) {
- p = objectDeque.getIdleObjects().pollFirst();
- }
- if (p == null) {
- create = true;
- p = create(key, false);
- }
- if (p == null && objectDeque != null) {
- if (maxWait < 1) {
- p = objectDeque.getIdleObjects().takeFirst();
- } else {
- p = objectDeque.getIdleObjects().pollFirst(maxWait,
- TimeUnit.MILLISECONDS);
+ try {
+ while (p == null) {
+ create = false;
+ if (whenExhaustedAction == WhenExhaustedAction.FAIL) {
+ if (objectDeque != null) {
+ p = objectDeque.getIdleObjects().pollFirst();
}
- }
- if (p == null) {
- throw new NoSuchElementException(
- "Timeout waiting for idle object");
- }
- if (!p.allocate()) {
- p = null;
- }
- } else if (whenExhaustedAction == WhenExhaustedAction.GROW) {
- if (objectDeque != null) {
- p = objectDeque.getIdleObjects().pollFirst();
- }
- if (p == null) {
- create = true;
- p = create(key, true);
- }
- if (p != null && !p.allocate()) {
- p = null;
- }
- }
-
- if (p != null) {
- try {
- _factory.activateObject(key, p.getObject());
- } catch (Exception e) {
- try {
- destroy(key, p);
- } catch (Exception e1) {
- // Ignore - activation failure is more important
+ if (p == null) {
+ create = true;
+ p = create(key, false);
+ }
+ if (p == null) {
+ throw new NoSuchElementException("Pool exhausted");
+ }
+ if (!p.allocate()) {
+ p = null;
+ }
+ } else if (whenExhaustedAction == WhenExhaustedAction.BLOCK) {
+ if (objectDeque != null) {
+ p = objectDeque.getIdleObjects().pollFirst();
+ }
+ if (p == null) {
+ create = true;
+ p = create(key, false);
+ }
+ if (p == null && objectDeque != null) {
+ if (maxWait < 1) {
+ p = objectDeque.getIdleObjects().takeFirst();
+ } else {
+ p = objectDeque.getIdleObjects().pollFirst(maxWait,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+ if (p == null) {
+ throw new NoSuchElementException(
+ "Timeout waiting for idle object");
+ }
+ if (!p.allocate()) {
+ p = null;
+ }
+ } else if (whenExhaustedAction == WhenExhaustedAction.GROW) {
+ if (objectDeque != null) {
+ p = objectDeque.getIdleObjects().pollFirst();
+ }
+ if (p == null) {
+ create = true;
+ p = create(key, true);
}
- p = null;
- if (create) {
- NoSuchElementException nsee = new
NoSuchElementException(
- "Unable to activate object");
- nsee.initCause(e);
- throw nsee;
+ if (p != null && !p.allocate()) {
+ p = null;
}
}
- if (p != null && getTestOnBorrow()) {
- boolean validate = false;
- Throwable validationThrowable = null;
+
+ if (p != null) {
try {
- validate = _factory.validateObject(key, p.getObject());
- } catch (Throwable t) {
- PoolUtils.checkRethrow(t);
- }
- if (!validate) {
+ _factory.activateObject(key, p.getObject());
+ } catch (Exception e) {
try {
destroy(key, p);
- } catch (Exception e) {
- // Ignore - validation failure is more important
+ } catch (Exception e1) {
+ // Ignore - activation failure is more important
}
p = null;
if (create) {
NoSuchElementException nsee = new
NoSuchElementException(
- "Unable to validate object");
- nsee.initCause(validationThrowable);
+ "Unable to activate object");
+ nsee.initCause(e);
throw nsee;
}
}
+ if (p != null && getTestOnBorrow()) {
+ boolean validate = false;
+ Throwable validationThrowable = null;
+ try {
+ validate = _factory.validateObject(key,
p.getObject());
+ } catch (Throwable t) {
+ PoolUtils.checkRethrow(t);
+ }
+ if (!validate) {
+ try {
+ destroy(key, p);
+ } catch (Exception e) {
+ // Ignore - validation failure is more
important
+ }
+ p = null;
+ if (create) {
+ NoSuchElementException nsee = new
NoSuchElementException(
+ "Unable to validate object");
+ nsee.initCause(validationThrowable);
+ throw nsee;
+ }
+ }
+ }
}
}
+ } finally {
+ deregister(key);
}
-
return p.getObject();
}
@@ -1258,22 +1265,28 @@ public class GenericKeyedObjectPool<K,T>
@Override
public void clear(K key) {
- ObjectDeque<T> objectDeque = poolMap.get(key);
- if (objectDeque == null) {
- return;
- }
- LinkedBlockingDeque<PooledObject<T>> idleObjects =
- objectDeque.getIdleObjects();
+ register(key);
- PooledObject<T> p = idleObjects.poll();
-
- while (p != null) {
- try {
- destroy(key, p);
- } catch (Exception e) {
- // TODO - Ignore?
+ try {
+ ObjectDeque<T> objectDeque = poolMap.get(key);
+ if (objectDeque == null) {
+ return;
+ }
+ LinkedBlockingDeque<PooledObject<T>> idleObjects =
+ objectDeque.getIdleObjects();
+
+ PooledObject<T> p = idleObjects.poll();
+
+ while (p != null) {
+ try {
+ destroy(key, p);
+ } catch (Exception e) {
+ // TODO - Ignore?
+ }
+ p = idleObjects.poll();
}
- p = idleObjects.poll();
+ } finally {
+ deregister(key);
}
}
@@ -1563,26 +1576,11 @@ public class GenericKeyedObjectPool<K,T>
}
}
- // Make sure the key exists in the poolMap
- ObjectDeque<T> objectDeque;
- int newNumActive;
- synchronized (poolMap) {
- // This all has to be in the sync block to ensure that the key is
- // not removed by destroy
- objectDeque = poolMap.get(key);
- if (objectDeque == null) {
- objectDeque = new ObjectDeque<T>();
- newNumActive = objectDeque.getNumActive().incrementAndGet();
- poolMap.put(key, objectDeque);
- poolKeyList.add(key);
- } else {
- newNumActive = objectDeque.getNumActive().incrementAndGet();
- }
- }
+ ObjectDeque<T> objectDeque = poolMap.get(key);
+ int newNumActive = objectDeque.getNumActive().incrementAndGet();
// Check against the per key limit
if (!force && maxActive > -1 && newNumActive > maxActive) {
- cleanObjectDeque(key, objectDeque);
numTotal.decrementAndGet();
return null;
}
@@ -1592,7 +1590,6 @@ public class GenericKeyedObjectPool<K,T>
try {
t = _factory.makeObject(key);
} catch (Exception e) {
- cleanObjectDeque(key, objectDeque);
numTotal.decrementAndGet();
throw e;
}
@@ -1603,33 +1600,76 @@ public class GenericKeyedObjectPool<K,T>
}
private void destroy(K key, PooledObject<T> toDestory) throws Exception {
-
- ObjectDeque<T> objectDeque = poolMap.get(key);
- objectDeque.getIdleObjects().remove(toDestory);
- objectDeque.getAllObjects().remove(toDestory.getObject());
+
+ register(key);
+
+ try {
+ ObjectDeque<T> objectDeque = poolMap.get(key);
+ objectDeque.getIdleObjects().remove(toDestory);
+ objectDeque.getAllObjects().remove(toDestory.getObject());
+
+ try {
+ _factory.destroyObject(key, toDestory.getObject());
+ } finally {
+ objectDeque.getNumActive().decrementAndGet();
+ numTotal.decrementAndGet();
+ }
+ } finally {
+ deregister(key);
+ }
+ }
+ private ObjectDeque<T> register(K k) {
+ Lock lock = keyLock.readLock();
+ ObjectDeque<T> objectDeque = null;
try {
- _factory.destroyObject(key, toDestory.getObject());
+ lock.lock();
+ objectDeque = poolMap.get(k);
+ if (objectDeque == null) {
+ // Upgrade to write lock
+ lock.unlock();
+ lock = keyLock.writeLock();
+ lock.lock();
+ objectDeque = poolMap.get(k);
+ if (objectDeque == null) {
+ objectDeque = new ObjectDeque<T>();
+ objectDeque.getNumInterested().incrementAndGet();
+ poolMap.put(k, objectDeque);
+ poolKeyList.add(k);
+ } else {
+ objectDeque.getNumInterested().incrementAndGet();
+ }
+ } else {
+ objectDeque.getNumInterested().incrementAndGet();
+ }
} finally {
- cleanObjectDeque(key, objectDeque);
- numTotal.decrementAndGet();
+ lock.unlock();
}
+ return objectDeque;
}
+
+ private void deregister(K k) {
+ ObjectDeque<T> objectDeque;
- private void cleanObjectDeque(K key, ObjectDeque<T> objectDeque) {
- int newNumActive = objectDeque.getNumActive().decrementAndGet();
- if (newNumActive == 0) {
- synchronized (poolMap) {
- newNumActive = objectDeque.getNumActive().get();
- if (newNumActive == 0) {
- poolMap.remove(key);
- poolKeyList.remove(key);
+ // TODO Think carefully about when a read lock is required
+ objectDeque = poolMap.get(k);
+ long numInterested = objectDeque.getNumInterested().decrementAndGet();
+ if (numInterested == 0 && objectDeque.getNumActive().get() == 0) {
+ // Potential to remove key
+ Lock lock = keyLock.writeLock();
+ lock.lock();
+ try {
+ if (objectDeque.getNumActive().get() == 0 &&
+ objectDeque.getNumInterested().get() == 0) {
+ poolMap.remove(k);
+ poolKeyList.remove(k);
}
+ } finally {
+ lock.unlock();
}
}
}
-
/**
* Iterates through all the known keys and creates any necessary objects
to maintain
* the minimum level of pooled objects.
@@ -1726,8 +1766,13 @@ public class GenericKeyedObjectPool<K,T>
if (_factory == null) {
throw new IllegalStateException("Cannot add objects without a
factory.");
}
- PooledObject<T> p = create(key, false);
- addIdleObject(key, p);
+ register(key);
+ try {
+ PooledObject<T> p = create(key, false);
+ addIdleObject(key, p);
+ } finally {
+ deregister(key);
+ }
}
/**
@@ -1843,6 +1888,8 @@ public class GenericKeyedObjectPool<K,T>
private Map<S, PooledObject<S>> allObjects =
new ConcurrentHashMap<S, PooledObject<S>>();
+ private AtomicLong numInterested = new AtomicLong(0);
+
public LinkedBlockingDeque<PooledObject<S>> getIdleObjects() {
return idleObjects;
}
@@ -1851,6 +1898,10 @@ public class GenericKeyedObjectPool<K,T>
return numActive;
}
+ public AtomicLong getNumInterested() {
+ return numInterested;
+ }
+
public Map<S, PooledObject<S>> getAllObjects() {
return allObjects;
}
@@ -2104,6 +2155,9 @@ public class GenericKeyedObjectPool<K,T>
/** List of pool keys - used to control eviction order */
private List<K> poolKeyList = new ArrayList<K>();
+ /** Lock used to manage adding/removing of keys */
+ private ReadWriteLock keyLock = new ReentrantReadWriteLock(true);
+
/**
* The combined count of the currently active objects for all keys and
those
* in the process of being created. Under load, it may exceed