Author: fhanik
Date: Tue Nov 25 17:14:32 2008
New Revision: 720693

URL: http://svn.apache.org/viewvc?rev=720693&view=rev
Log:
Add the ability to retrieve connection object asynchronously


Added:
    
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/SimplePOJOAsyncExample.java
Modified:
    tomcat/trunk/modules/jdbc-pool/doc/jdbc-pool.xml
    
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
    
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceFactory.java
    
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceProxy.java
    
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/DefaultTestCase.java
    
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/FairnessTest.java
    
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TwoDataSources.java

Modified: tomcat/trunk/modules/jdbc-pool/doc/jdbc-pool.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/doc/jdbc-pool.xml?rev=720693&r1=720692&r2=720693&view=diff
==============================================================================
--- tomcat/trunk/modules/jdbc-pool/doc/jdbc-pool.xml (original)
+++ tomcat/trunk/modules/jdbc-pool/doc/jdbc-pool.xml Tue Nov 25 17:14:32 2008
@@ -48,7 +48,9 @@
       <li>commons-dbcp uses static interfaces. This means you can't compile it 
with JDK 1.6, or if you run on JDK 1.6/1.7 you will get 
           NoSuchMethodException for all the methods not implemented, even if 
the driver supports it.  </li>
       <li>The commons-dbcp has become fairly stagnant. Sparse updates, 
releases, and new feature support.</li>
-      <li>It's not worth rewriting over 60 classes, when something as a 
connection pool can be accomplished with as a much simpler implementation.</li> 
+      <li>It's not worth rewriting over 60 classes, when something as a 
connection pool can be accomplished with as a much simpler implementation.</li>
+      <li>Tomcat jdbc pool implements a fairness option not available in 
commons-dbcp and still performs faster than commons-dbcp</li>
+      <li>Tomcat jdbc pool implements the ability retrieve a connection 
asynchronously, without adding additional threads to the library itself</li> 
       <li>Tomcat jdbc pool is a Tomcat module, it depends on Tomcat JULI, a 
simplified logging framework used in Tomcat.</li>
     </ol>
   </p>
@@ -70,6 +72,7 @@
       <li>Extremely simple, due to the very simplified implementation, the 
line count and source file count are very low, compare with c3p0 
           that has over 200 source files(last time we checked), Tomcat jdbc 
has a core of 8 files, the connection pool itself is about half 
           that. As bugs may occur, they will be faster to track down, and 
easier to fix. Complexity reduction has been a focus from inception.</li>
+      <li>Asynchronous connection retrieval - you can queue your request for a 
connection and receive a Future&lt;Connection&gt; back.</li>    
     </ol>
   </p>
 
@@ -87,6 +90,7 @@
       <li><code>validationInterval</code> - in addition to running validations 
on connections, avoid running them too frequently.</li>
       <li><code>jdbcInterceptors</code> - flexible and pluggable interceptors 
to create any customizations around the pool, 
           the query execution and the result set handling. More on this in the 
advanced section.</li>
+      <li><code>fairQueue</code> - Set the fair flag to true to achieve thread 
fairness or to use asynchronous connection retrieval</li>
     </ul>    
   </subsection>
   <subsection name="Inside the Apache Tomcat Container">
@@ -322,6 +326,7 @@
       <p>(boolean) Set to true if you wish that calls to getConnection should 
be treated
          fairly in a true FIFO fashion. This uses the 
<code>org.apache.tomcat.jdbc.pool.FairBlockingQueue</code> 
          implementation for the list of the idle connections. The default 
value is <code>false</code>.
+         This flag is required when you want to use asynchronous connection 
retrieval.
       </p>
     </attribute>
     <attribute name="useEquals" required="false">
@@ -437,6 +442,32 @@
     </source>
   
   </subsection>
+  <subsection name="Asynchronous Connection Retrieval">
+    <p> The Tomcat JDBC connection pool supports asynchronous connection 
retrieval without adding additional threads to the 
+        pool library. It does this by adding a method to the data source 
called <code>Future&lt;Connection&gt; getConnectionAsync()</code>.
+        In order to use the async retrieval, two conditions must be met:<br/>
+        1. You must configure the <code>fairQueue</code> property to be 
<code>true</code>.<br/>
+        2. You will have to cast the data source to 
<code>org.apache.tomcat.jdbc.pool.DataSource</code><br/>
+        An example of using the async feature is show below.
+      <source>
+      
+        Connection con = null;
+        try {            
+          Future&lt;Connection&gt; future = datasource.getConnectionAsync();
+          while (!future.isDone()) {
+              System.out.println("Connection is not yet available. Do some 
background work");
+              try {
+                  Thread.sleep(100); //simulate work
+              }catch (InterruptedException x) {
+                  Thread.currentThread().interrupted();
+              }
+          }
+          con = future.get(); //should return instantly 
+          Statement st = con.createStatement();
+          ResultSet rs = st.executeQuery("select * from user");
+      </source>
+    </p>
+  </subsection>
   <subsection name="Interceptors">
     <p>Interceptors are a powerful way to enable, disable or modify 
functionality on a specific connection or its sub components.
        There are many different use cases for when interceptors are useful. By 
default, and for performance reasons, the connection pool is stateless.
@@ -499,6 +530,7 @@
     </p>   
   </subsection>
   
+  
 </section>
 
 

Modified: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java?rev=720693&r1=720692&r2=720693&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
 Tue Nov 25 17:14:32 2008
@@ -28,14 +28,18 @@
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 
 import org.apache.tomcat.jdbc.pool.jmx.ConnectionPoolMBean;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.InstanceAlreadyExistsException;
@@ -113,7 +117,12 @@
 
 
     public Future<Connection> getConnectionAsync() throws SQLException {
-        return null;
+        if (idle instanceof FairBlockingQueue) {
+            Future<PooledConnection> pcf = 
((FairBlockingQueue<PooledConnection>)idle).pollAsync();
+            return new ConnectionFuture(pcf);
+        } else {
+            throw new SQLException("Connection pool is misconfigured, doesn't 
support async retrieval. Set the 'fair' property to 'true'");
+        }
     }
     
     /**
@@ -726,7 +735,71 @@
             log.warn("Unable to stop JMX integration for connection pool. 
Instance["+getName()+"].",x);
         }
     }
+    
+    /**
+     * Tread safe wrapper around a future for the regular queue
+     * This one retrieves the pooled connection object
+     * and performs the initialization according to 
+     * interceptors and validation rules.
+     * This class is thread safe.
+     * @author fhanik
+     *
+     */
+    protected class ConnectionFuture implements Future<Connection> {
+        Future<PooledConnection> pcFuture = null;
+        AtomicBoolean configured = new AtomicBoolean(false);
+        CountDownLatch latch = new CountDownLatch(1);
+        Connection result = null;
+        SQLException cause = null;
+        public ConnectionFuture(Future<PooledConnection> pcf) {
+            this.pcFuture = pcf;
+        }
+        
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return pcFuture.cancel(mayInterruptIfRunning);
+        }
 
+        public Connection get() throws InterruptedException, 
ExecutionException {
+            try {
+                return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            }catch (TimeoutException x) {
+                throw new ExecutionException(x);
+            }
+        }
+
+        public Connection get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+            PooledConnection pc = pcFuture.get(timeout,unit);
+            if (pc!=null) {
+                if (result!=null) return result;
+                if (configured.compareAndSet(false, true)) {
+                    try {
+                        pc = borrowConnection(System.currentTimeMillis(),pc);
+                        result = ConnectionPool.this.setupConnection(pc);
+                    } catch (SQLException x) {
+                        cause = x;
+                    } finally {
+                        latch.countDown();
+                    }
+                } else {
+                    //if we reach here, another thread is configuring the 
actual connection 
+                    latch.await(timeout,unit); //this shouldn't block for long
+                }
+                if (result==null) throw new ExecutionException(cause);
+                return result;
+            } else {
+                return null;
+            }
+        }
+
+        public boolean isCancelled() {
+            return pcFuture.isCancelled();
+        }
+
+        public boolean isDone() {
+            return pcFuture.isDone();
+        }
+        
+    }
 
     protected class PoolCleaner extends Thread {
         protected ConnectionPool pool;

Modified: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceFactory.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceFactory.java?rev=720693&r1=720692&r2=720693&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceFactory.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceFactory.java
 Tue Nov 25 17:14:32 2008
@@ -403,6 +403,8 @@
     }
 
     public static DataSource 
getDataSource(org.apache.tomcat.jdbc.pool.DataSourceProxy dataSource) {
+        if (dataSource instanceof DataSource) return (DataSource)dataSource;
+        //only return a proxy if we didn't implement the DataSource interface
         DataSourceHandler handler = new DataSourceHandler(dataSource);
         DataSource ds = 
(DataSource)Proxy.newProxyInstance(DataSourceFactory.class.getClassLoader(), 
new Class[] {javax.sql.DataSource.class}, handler);
         return ds;

Modified: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceProxy.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceProxy.java?rev=720693&r1=720692&r2=720693&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceProxy.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/DataSourceProxy.java
 Tue Nov 25 17:14:32 2008
@@ -20,6 +20,7 @@
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Iterator;
+import java.util.concurrent.Future;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -93,6 +94,17 @@
             return createPool().getConnection();
         return pool.getConnection();
     }
+    
+    /**
+     * Invokes an sync operation to retrieve the connection.
+     * @return
+     * @throws SQLException
+     */
+    public Future<Connection> getConnectionAsync() throws SQLException {
+        if (pool == null)
+            return createPool().getConnectionAsync();
+        return pool.getConnectionAsync();
+    }
 
     /**
      * [EMAIL PROTECTED]

Modified: 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/DefaultTestCase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/DefaultTestCase.java?rev=720693&r1=720692&r2=720693&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/DefaultTestCase.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/DefaultTestCase.java
 Tue Nov 25 17:14:32 2008
@@ -56,7 +56,7 @@
         p.setMinIdle(threadcount);
         p.setLogAbandoned(false);
         p.setRemoveAbandoned(false);
-        datasource = new org.apache.tomcat.jdbc.pool.DataSourceProxy();
+        datasource = new org.apache.tomcat.jdbc.pool.DataSource();
         datasource.setPoolProperties(p);
         return datasource;
     }

Modified: 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/FairnessTest.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/FairnessTest.java?rev=720693&r1=720692&r2=720693&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/FairnessTest.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/FairnessTest.java
 Tue Nov 25 17:14:32 2008
@@ -17,6 +17,7 @@
 package org.apache.tomcat.jdbc.test;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.sql.Connection;
@@ -26,6 +27,7 @@
 import javax.sql.DataSource;
 
 import org.apache.tomcat.jdbc.pool.DataSourceFactory;
+import org.apache.tomcat.jdbc.pool.DataSourceProxy;
 
 /**
  * @author Filip Hanik
@@ -143,11 +145,42 @@
         tearDown();
     }
 
+    public void testPoolThreads20Connections10FairAsync() throws Exception {
+        System.out.println("Starting fairness - Tomcat JDBC - Fair - Async");
+        init();
+        this.datasource.getPoolProperties().setMaxActive(10);
+        this.datasource.getPoolProperties().setFairQueue(true);
+        this.threadcount = 20;
+        this.transferProperties();
+        this.datasource.getConnection().close();
+        latch = new CountDownLatch(threadcount);
+        long start = System.currentTimeMillis();
+        TestThread[] threads = new TestThread[threadcount];
+        for (int i=0; i<threadcount; i++) {
+            threads[i] = new TestThread();
+            threads[i].setName("tomcat-pool-"+i);
+            threads[i].async = true;
+            threads[i].d = DataSourceFactory.getDataSource(this.datasource);
+            
+        }
+        for (int i=0; i<threadcount; i++) {
+            threads[i].start();
+        }
+        if (!latch.await(complete+1000,TimeUnit.MILLISECONDS)) {
+            System.out.println("Latch timed out.");
+        }
+        this.run = false;
+        long delta = System.currentTimeMillis() - start;
+        printThreadResults(threads,"testPoolThreads20Connections10FairAsync");
+        System.out.println("Completed fairness - Tomcat JDBC - Fair - Async");
+        tearDown();
+    }
     
     public class TestThread extends Thread {
         protected DataSource d;
         protected String query = null;
         protected long sleep = 10;
+        protected boolean async = false;
         long max = -1, totalmax=0, totalcmax=0, cmax = -1, nroffetch = 0, 
totalruntime = 0;
         public void run() {
             try {
@@ -157,7 +190,12 @@
                     long start = System.nanoTime();
                     Connection con = null;
                     try {
-                        con = d.getConnection();
+                        if (async) {
+                            Future<Connection> cf = 
((DataSourceProxy)d).getConnectionAsync();
+                            con  = cf.get();
+                        } else {
+                            con = d.getConnection();
+                        }
                         long delta = System.nanoTime() - start;
                         totalmax += delta;
                         max = Math.max(delta, max);

Added: 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/SimplePOJOAsyncExample.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/SimplePOJOAsyncExample.java?rev=720693&view=auto
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/SimplePOJOAsyncExample.java
 (added)
+++ 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/SimplePOJOAsyncExample.java
 Tue Nov 25 17:14:32 2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.jdbc.test;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.Future;
+
+import org.apache.tomcat.jdbc.pool.DataSource;
+import org.apache.tomcat.jdbc.pool.PoolProperties;
+
+public class SimplePOJOAsyncExample {
+
+    public static void main(String[] args) throws Exception {
+        PoolProperties p = new PoolProperties();
+        p.setFairQueue(true);
+        p.setUrl("jdbc:mysql://localhost:3306/mysql?autoReconnect=true");
+        p.setDriverClassName("com.mysql.jdbc.Driver");
+        p.setUsername("root");
+        p.setPassword("password");
+        p.setJmxEnabled(true);
+        p.setTestWhileIdle(false);
+        p.setTestOnBorrow(true);
+        p.setValidationQuery("SELECT 1");
+        p.setTestOnReturn(false);
+        p.setValidationInterval(30000);
+        p.setTimeBetweenEvictionRunsMillis(30000);
+        p.setMaxActive(100);
+        p.setInitialSize(10);
+        p.setMaxWait(10000);
+        p.setRemoveAbandonedTimeout(60);
+        p.setMinEvictableIdleTimeMillis(30000);
+        p.setMinIdle(10);
+        p.setLogAbandoned(true);
+        p.setRemoveAbandoned(true);
+        
p.setJdbcInterceptors("org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer");
+        DataSource datasource = new DataSource();
+        datasource.setPoolProperties(p); 
+        
+        Connection con = null;
+        try {            
+          Future<Connection> future = datasource.getConnectionAsync();
+          while (!future.isDone()) {
+              System.out.println("Connection is not yet available. Do some 
background work");
+              try {
+                  Thread.sleep(100); //simulate work
+              }catch (InterruptedException x) {
+                  Thread.currentThread().interrupted();
+              }
+          }
+          con = future.get(); //should return instantly 
+          Statement st = con.createStatement();
+          ResultSet rs = st.executeQuery("select * from user");
+          int cnt = 1;
+          while (rs.next()) {
+              System.out.println((cnt++)+". Host:" +rs.getString("Host")+" 
User:"+rs.getString("User")+" Password:"+rs.getString("Password"));
+          }
+          rs.close();
+          st.close();
+        } finally {
+          if (con!=null) try {con.close();}catch (Exception ignore) {}
+        }  
+    }
+
+}
\ No newline at end of file

Modified: 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TwoDataSources.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TwoDataSources.java?rev=720693&r1=720692&r2=720693&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TwoDataSources.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TwoDataSources.java
 Tue Nov 25 17:14:32 2008
@@ -34,14 +34,12 @@
             this.assertTrue("Connection c2 should be working",false);
         }
         try {
-            c1.close();
-            this.assertTrue("Connection should have been closed.",false);
+            this.assertTrue("Connection should have been 
closed.",c1.isClosed());
         }catch (Exception x) {
             this.assertTrue("This is correct, c1 is closed",true);
         }
         try {
-            c2.close();
-            this.assertTrue("Connection c2 should not have been closed.",true);
+            this.assertFalse("Connection c2 should not have been 
closed.",c2.isClosed());
         }catch (Exception x) {
             this.assertTrue("Connection c2 should be working",false);
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to