POOL-303 Ensure that threads do not block indefinitely if more than maxTotal threads try to borrow an object at the same time and the factory fails to create any objects.
git-svn-id: https://svn.apache.org/repos/asf/commons/proper/pool/trunk@1735161 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/commons-pool/repo Commit: http://git-wip-us.apache.org/repos/asf/commons-pool/commit/fa819eb4 Tree: http://git-wip-us.apache.org/repos/asf/commons-pool/tree/fa819eb4 Diff: http://git-wip-us.apache.org/repos/asf/commons-pool/diff/fa819eb4 Branch: refs/heads/master Commit: fa819eb4be47b7078ef973cbc37df38e7df6eb1f Parents: a898de8 Author: Mark Thomas <ma...@apache.org> Authored: Tue Mar 15 20:17:51 2016 +0000 Committer: Mark Thomas <ma...@apache.org> Committed: Tue Mar 15 20:17:51 2016 +0000 ---------------------------------------------------------------------- src/changes/changes.xml | 5 ++ .../pool2/impl/GenericKeyedObjectPool.java | 18 +++- .../commons/pool2/impl/GenericObjectPool.java | 6 ++ .../pool2/impl/TestGenericObjectPool.java | 93 +++++++++++++++++--- 4 files changed, 107 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/commons-pool/blob/fa819eb4/src/changes/changes.xml ---------------------------------------------------------------------- diff --git a/src/changes/changes.xml b/src/changes/changes.xml index f515689..a1293f7 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -51,6 +51,11 @@ The <action> type attribute can be add,update,fix,remove. Ensure BaseGenericObjectPool.IdentityWrapper#equals() follows the expected contract for equals(). </action> + <action dev="markt" issue="POOL-303" type="fix"> + Ensure that threads do not block indefinitely if more than maxTotal + threads try to borrow an object at the same time and the factory fails to + create any objects. + </action> </release> <release version="2.4.2" date="2015-08-01" description= "This is a patch release, including bug fixes only."> http://git-wip-us.apache.org/repos/asf/commons-pool/blob/fa819eb4/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java b/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java index eb335d6..697ae05 100644 --- a/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java +++ b/src/main/java/org/apache/commons/pool2/impl/GenericKeyedObjectPool.java @@ -476,7 +476,7 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> throw new IllegalStateException( "Returned object not currently part of this pool"); } - + synchronized(p) { final PooledObjectState state = p.getState(); if (state != PooledObjectState.ALLOCATED) { @@ -907,7 +907,7 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> if (objectDeque == null) { continue; } - + final Deque<PooledObject<T>> idleObjects = objectDeque.getIdleObjects(); evictionIterator = new EvictionIterator(idleObjects); if (evictionIterator.hasNext()) { @@ -1004,6 +1004,8 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> final int maxTotalPerKeySave = getMaxTotalPerKey(); // Per key final int maxTotal = getMaxTotal(); // All keys + final ObjectDeque<T> objectDeque = poolMap.get(key); + // Check against the overall limit boolean loop = true; @@ -1012,6 +1014,9 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> if (maxTotal > -1 && newNumTotal > maxTotal) { numTotal.decrementAndGet(); if (getNumIdle() == 0) { + // POOL-303. There may be threads waiting on an object + // return that isn't going to happen. Unblock them. + objectDeque.idleObjects.interuptTakeWaiters(); return null; } clearOldest(); @@ -1020,7 +1025,6 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> } } - final ObjectDeque<T> objectDeque = poolMap.get(key); final long newCreateCount = objectDeque.getCreateCount().incrementAndGet(); // Check against the per key limit @@ -1028,6 +1032,9 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> newCreateCount > Integer.MAX_VALUE) { numTotal.decrementAndGet(); objectDeque.getCreateCount().decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + objectDeque.idleObjects.interuptTakeWaiters(); return null; } @@ -1038,6 +1045,9 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> } catch (final Exception e) { numTotal.decrementAndGet(); objectDeque.getCreateCount().decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + objectDeque.idleObjects.interuptTakeWaiters(); throw e; } @@ -1433,7 +1443,7 @@ public class GenericKeyedObjectPool<K,T> extends BaseGenericObjectPool<T> /* * The map is keyed on pooled instances, wrapped to ensure that - * they work properly as keys. + * they work properly as keys. */ private final Map<IdentityWrapper<S>, PooledObject<S>> allObjects = new ConcurrentHashMap<IdentityWrapper<S>, PooledObject<S>>(); http://git-wip-us.apache.org/repos/asf/commons-pool/blob/fa819eb4/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java b/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java index 6c8dc71..b5ef9bd 100644 --- a/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java +++ b/src/main/java/org/apache/commons/pool2/impl/GenericObjectPool.java @@ -856,6 +856,9 @@ public class GenericObjectPool<T> extends BaseGenericObjectPool<T> if (localMaxTotal > -1 && newCreateCount > localMaxTotal || newCreateCount > Integer.MAX_VALUE) { createCount.decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + idleObjects.interuptTakeWaiters(); return null; } @@ -864,6 +867,9 @@ public class GenericObjectPool<T> extends BaseGenericObjectPool<T> p = factory.makeObject(); } catch (final Exception e) { createCount.decrementAndGet(); + // POOL-303. There may be threads waiting on an object return that + // isn't going to happen. Unblock them. + idleObjects.interuptTakeWaiters(); throw e; } http://git-wip-us.apache.org/repos/asf/commons-pool/blob/fa819eb4/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java b/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java index a694106..f951691 100644 --- a/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java +++ b/src/test/java/org/apache/commons/pool2/impl/TestGenericObjectPool.java @@ -33,6 +33,7 @@ import java.util.Random; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; @@ -2397,11 +2398,11 @@ public class TestGenericObjectPool extends TestBaseObjectPool { Assert.assertEquals(1, factory.validateCounter); } - + /** * Verifies that when a factory's makeObject produces instances that are not - * discernible by equals, the pool can handle them. - * + * discernible by equals, the pool can handle them. + * * JIRA: POOL-283 */ @Test @@ -2415,11 +2416,11 @@ public class TestGenericObjectPool extends TestBaseObjectPool { pool.returnObject(s2); pool.close(); } - + /** * Verifies that when a borrowed object is mutated in a way that does not * preserve equality and hashcode, the pool can recognized it on return. - * + * * JIRA: POOL-284 */ @Test @@ -2435,11 +2436,11 @@ public class TestGenericObjectPool extends TestBaseObjectPool { pool.returnObject(s2); pool.close(); } - + /** * Verifies that returning an object twice (without borrow in between) causes ISE * but does not re-validate or re-passivate the instance. - * + * * JIRA: POOL-285 */ @Test @@ -2460,7 +2461,7 @@ public class TestGenericObjectPool extends TestBaseObjectPool { Assert.assertEquals(1, waiter.getPassivationCount()); } } - + public void testPreparePool() throws Exception { pool.setMinIdle(1); pool.setMaxTotal(1); @@ -2486,13 +2487,13 @@ public class TestGenericObjectPool extends TestBaseObjectPool { return new DefaultPooledObject<Object>(value); } } - - /** + + /** * Factory that creates HashSets. Note that this means * 0) All instances are initially equal (not discernible by equals) * 1) Instances are mutable and mutation can cause change in identity / hashcode. */ - private static final class HashSetFactory + private static final class HashSetFactory extends BasePooledObjectFactory<HashSet<String>> { @Override public HashSet<String> create() throws Exception { @@ -2544,4 +2545,74 @@ public class TestGenericObjectPool extends TestBaseObjectPool { } } } + + @Test + public void testFailingFactoryDoesNotBlockThreads() throws Exception { + + final CreateFailFactory factory = new CreateFailFactory(); + final GenericObjectPool<String> createFailFactoryPool = + new GenericObjectPool<String>(factory); + + createFailFactoryPool.setMaxTotal(1); + + // Try and borrow the first object from the pool + final WaitingTestThread thread1 = new WaitingTestThread(createFailFactoryPool, 0); + thread1.start(); + + // Wait for thread to reach semaphore + while(!factory.hasQueuedThreads()) { + Thread.sleep(200); + } + + // Try and borrow the second object from the pool + final WaitingTestThread thread2 = new WaitingTestThread(createFailFactoryPool, 0); + thread2.start(); + // Pool will not call factory since maximum number of object creations + // are already queued. + + // Thread 2 will wait on an object being returned to the pool + // Give thread 2 a chance to reach this state + Thread.sleep(1000); + + // Release thread1 + factory.release(); + // Pre-release thread2 + factory.release(); + + // Both threads should now complete. + boolean threadRunning = true; + int count = 0; + while (threadRunning && count < 15) { + threadRunning = thread1.isAlive(); + threadRunning = thread2.isAlive(); + Thread.sleep(200); + count++; + } + Assert.assertFalse(thread1.isAlive()); + Assert.assertFalse(thread2.isAlive()); + } + + private static class CreateFailFactory extends BasePooledObjectFactory<String> { + + private final Semaphore semaphore = new Semaphore(0); + + @Override + public String create() throws Exception { + semaphore.acquire(); + throw new Exception(); + } + + @Override + public PooledObject<String> wrap(String obj) { + return new DefaultPooledObject<String>(obj); + } + + public void release() { + semaphore.release(); + } + + public boolean hasQueuedThreads() { + return semaphore.hasQueuedThreads(); + } + } }