Repository: cayenne Updated Branches: refs/heads/master 5d75b6907 -> 05e55337b
CAY-2009 Non-blocking connection pool * ensure consistent auto-commit state regardless of whether the connection is new or previously pooled (this was the real cause of H2 failures) Project: http://git-wip-us.apache.org/repos/asf/cayenne/repo Commit: http://git-wip-us.apache.org/repos/asf/cayenne/commit/05e55337 Tree: http://git-wip-us.apache.org/repos/asf/cayenne/tree/05e55337 Diff: http://git-wip-us.apache.org/repos/asf/cayenne/diff/05e55337 Branch: refs/heads/master Commit: 05e55337ba297ec08275f14a9f4b11f50a16918a Parents: 5d75b69 Author: aadamchik <[email protected]> Authored: Sun May 10 13:42:39 2015 -0400 Committer: aadamchik <[email protected]> Committed: Sun May 10 13:42:42 2015 -0400 ---------------------------------------------------------------------- .../cayenne/datasource/PoolAwareConnection.java | 2 - .../cayenne/datasource/PoolingDataSource.java | 41 ++++++++++++++++---- .../cayenne/datasource/PoolingDataSourceIT.java | 36 +++++++++++++++-- 3 files changed, 67 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cayenne/blob/05e55337/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolAwareConnection.java ---------------------------------------------------------------------- diff --git a/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolAwareConnection.java b/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolAwareConnection.java index da1e3d9..b7cbfa0 100644 --- a/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolAwareConnection.java +++ b/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolAwareConnection.java @@ -51,8 +51,6 @@ public class PoolAwareConnection implements Connection { private Connection connection; private String validationQuery; - - public PoolAwareConnection(PoolingDataSource parent, Connection connection, String validationQuery) { this.parent = parent; this.connection = connection; http://git-wip-us.apache.org/repos/asf/cayenne/blob/05e55337/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolingDataSource.java ---------------------------------------------------------------------- diff --git a/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolingDataSource.java b/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolingDataSource.java index 88ece22..f760ec1 100644 --- a/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolingDataSource.java +++ b/cayenne-server/src/main/java/org/apache/cayenne/datasource/PoolingDataSource.java @@ -88,6 +88,7 @@ public class PoolingDataSource implements DataSource { private int maxIdleConnections; private int minConnections; + private int maxConnections; private String validationQuery; static int maxIdleConnections(int min, int max) { @@ -117,6 +118,7 @@ public class PoolingDataSource implements DataSource { this.maxQueueWaitTime = parameters.getMaxQueueWaitTime(); this.validationQuery = parameters.getValidationQuery(); this.minConnections = minConnections; + this.maxConnections = maxConnections; this.pool = new ConcurrentHashMap<PoolAwareConnection, Object>((int) (maxConnections / 0.75)); this.available = new ArrayBlockingQueue<PoolAwareConnection>(maxConnections); this.poolCap = new Semaphore(maxConnections); @@ -204,6 +206,8 @@ public class PoolingDataSource implements DataSource { */ void reclaim(PoolAwareConnection connection) { + // TODO: rollback any in-process tx? + // the queue may overflow potentially and we won't be able to add the // object if (!available.offer(connection)) { @@ -265,12 +269,16 @@ public class PoolingDataSource implements DataSource { } /** - * Creates a new connection in a consistent state. + * Creates a new connection. */ Connection createUnwrapped() throws SQLException { - Connection c = nonPoolingDataSource.getConnection(); + return nonPoolingDataSource.getConnection(); + } - // set default connection state... + /** + * Updates connection state to a default state. + */ + Connection resetState(Connection c) throws SQLException { // TODO: tx isolation level? @@ -284,7 +292,6 @@ public class PoolingDataSource implements DataSource { } c.clearWarnings(); - return c; } @@ -300,17 +307,17 @@ public class PoolingDataSource implements DataSource { c = uncheckNonBlocking(true); if (c != null) { - return c; + return resetState(c); } c = createUnchecked(); if (c != null) { - return c; + return resetState(c); } c = uncheckBlocking(true); if (c != null) { - return c; + return resetState(c); } throw new SQLException("Can't obtain connection. Request to pool timed out. Total pool size: " + pool.size()); @@ -357,4 +364,24 @@ public class PoolingDataSource implements DataSource { public Logger getParentLogger() throws SQLFeatureNotSupportedException { throw new SQLFeatureNotSupportedException(); } + + String getValidationQuery() { + return validationQuery; + } + + long getMaxQueueWaitTime() { + return maxQueueWaitTime; + } + + int getMaxIdleConnections() { + return maxIdleConnections; + } + + int getMinConnections() { + return minConnections; + } + + int getMaxConnections() { + return maxConnections; + } } http://git-wip-us.apache.org/repos/asf/cayenne/blob/05e55337/cayenne-server/src/test/java/org/apache/cayenne/datasource/PoolingDataSourceIT.java ---------------------------------------------------------------------- diff --git a/cayenne-server/src/test/java/org/apache/cayenne/datasource/PoolingDataSourceIT.java b/cayenne-server/src/test/java/org/apache/cayenne/datasource/PoolingDataSourceIT.java index 7052281..342300c 100644 --- a/cayenne-server/src/test/java/org/apache/cayenne/datasource/PoolingDataSourceIT.java +++ b/cayenne-server/src/test/java/org/apache/cayenne/datasource/PoolingDataSourceIT.java @@ -27,6 +27,8 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -47,11 +49,39 @@ public class PoolingDataSourceIT extends BasePoolingDataSourceIT { @Test public void testGetConnectionAutoCommit() throws Exception { - Connection c1 = dataSource.getConnection(); + assertTrue(dataSource.getMaxConnections() > 0); + + List<Connection> connections = new ArrayList<Connection>(); try { - assertTrue("Failed to reset connection state", c1.getAutoCommit()); + + for (int i = 0; i < dataSource.getMaxConnections(); i++) { + Connection c = dataSource.getConnection(); + assertTrue("Failed to reset connection state", c.getAutoCommit()); + connections.add(c); + } + + for (Connection c : connections) { + c.setAutoCommit(false); + c.close(); + } + + for (int i = 0; i < dataSource.getMaxConnections(); i++) { + Connection c = dataSource.getConnection(); + + // presumably this pass through the pool should return existing + // connections + assertTrue(connections.contains(c)); + assertTrue("Failed to reset connection state for reused connection", c.getAutoCommit()); + } + } finally { - c1.close(); + for (Connection c : connections) { + try { + c.close(); + } catch (SQLException e) { + + } + } } }
