Author: fhanik
Date: Tue Nov 25 14:57:24 2008
New Revision: 720641

URL: http://svn.apache.org/viewvc?rev=720641&view=rev
Log:
Extend the fair blocking queue to allow asynchronous polling

Added:
    
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
Modified:
    
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java

Modified: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java?rev=720641&r1=720640&r2=720641&view=diff
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
 (original)
+++ 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
 Tue Nov 25 14:57:24 2008
@@ -21,7 +21,10 @@
 import java.util.LinkedList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -47,6 +50,9 @@
     //------------------------------------------------------------------
     // USED BY CONPOOL IMPLEMENTATION
     //------------------------------------------------------------------
+    /**
+     * [EMAIL PROTECTED]
+     */
     public boolean offer(E e) {
         final ReentrantLock lock = this.lock;
         lock.lock();
@@ -65,10 +71,16 @@
         return true;
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public boolean offer(E e, long timeout, TimeUnit unit) throws 
InterruptedException {
         return offer(e);
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
         E result = null;
         final ReentrantLock lock = this.lock;
@@ -97,7 +109,39 @@
         }
         return result;
     }
-
+    
+    /**
+     * Request an item from the queue asynchronously
+     * @return - a future pending the result from the queue poll request
+     */
+    public Future<E> pollAsync() {
+        Future<E> result = null;
+        final ReentrantLock lock = this.lock;
+        boolean error = true;
+        lock.lock();
+        try {
+            E item = items.poll();
+            if (item==null) {
+                ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
+                waiters.addLast(c);
+                lock.unlock();
+                result = new ItemFuture(c);
+            } else {
+                lock.unlock();
+                result = new ItemFuture(item);
+            }
+            error = false;
+        } finally {
+            if (error && lock.isHeldByCurrentThread()) {
+                lock.unlock();
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * [EMAIL PROTECTED]
+     */
     public boolean remove(Object e) {
         final ReentrantLock lock = this.lock;
         lock.lock();
@@ -107,15 +151,24 @@
             lock.unlock();
         }
     }
-
+    
+    /**
+     * [EMAIL PROTECTED]
+     */
     public int size() {
         return items.size();
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public Iterator<E> iterator() {
         return new FairIterator();
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public E poll() {
         final ReentrantLock lock = this.lock;
         lock.lock();
@@ -126,6 +179,9 @@
         }
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public boolean contains(Object e) {
         final ReentrantLock lock = this.lock;
         lock.lock();
@@ -140,31 +196,53 @@
     //------------------------------------------------------------------
     // NOT USED BY CONPOOL IMPLEMENTATION
     //------------------------------------------------------------------
-
+    /**
+     * [EMAIL PROTECTED]
+     */
     public boolean add(E e) {
         return offer(e);
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public int drainTo(Collection<? super E> c, int maxElements) {
         throw new UnsupportedOperationException("int drainTo(Collection<? 
super E> c, int maxElements)");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public int drainTo(Collection<? super E> c) {
         return drainTo(c,Integer.MAX_VALUE);
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public void put(E e) throws InterruptedException {
         offer(e);
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public int remainingCapacity() {
         return Integer.MAX_VALUE - size();
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public E take() throws InterruptedException {
         return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public boolean addAll(Collection<? extends E> c) {
         Iterator i = c.iterator();
         while (i.hasNext()) {
@@ -174,56 +252,146 @@
         return true;
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public void clear() {
         throw new UnsupportedOperationException("void clear()");
 
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public boolean containsAll(Collection<?> c) {
         throw new UnsupportedOperationException("boolean 
containsAll(Collection<?> c)");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     */
     public boolean isEmpty() {
         return size() == 0;
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public boolean removeAll(Collection<?> c) {
         throw new UnsupportedOperationException("boolean 
removeAll(Collection<?> c)");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public boolean retainAll(Collection<?> c) {
         throw new UnsupportedOperationException("boolean 
retainAll(Collection<?> c)");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public Object[] toArray() {
         throw new UnsupportedOperationException("Object[] toArray()");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public <T> T[] toArray(T[] a) {
         throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public E element() {
         throw new UnsupportedOperationException("E element()");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public E peek() {
         throw new UnsupportedOperationException("E peek()");
     }
 
+    /**
+     * [EMAIL PROTECTED]
+     * @throws UnsupportedOperation - this operation is not supported
+     */
     public E remove() {
         throw new UnsupportedOperationException("E remove()");
     }
 
 
 
+    //------------------------------------------------------------------
+    // Future used to check and see if a connection has been made available
+    //------------------------------------------------------------------
+    protected class ItemFuture<T> implements Future<T> {
+        protected volatile T item = null;
+        protected volatile ExchangeCountDownLatch<T> latch = null;
+        protected volatile boolean canceled = false;
+        
+        public ItemFuture(T item) {
+            this.item = item;
+        }
+        
+        public ItemFuture(ExchangeCountDownLatch<T> latch) {
+            this.latch = latch;
+        }
+        
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false; //don't allow cancel for now
+        }
 
+        public T get() throws InterruptedException, ExecutionException {
+            if (item!=null) {
+                return item;
+            } else if (latch!=null) {
+                latch.await();
+                return latch.getItem();
+            } else {
+                throw new ExecutionException("ItemFuture incorrectly 
instantiated. Bug in the code?", new Exception());
+            }
+        }
+
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+            if (item!=null) {
+                return item;
+            } else if (latch!=null) {
+                boolean timedout = !latch.await(timeout, unit);
+                if (timedout) throw new TimeoutException();
+                else return latch.getItem();
+            } else {
+                throw new ExecutionException("ItemFuture incorrectly 
instantiated. Bug in the code?", new Exception());
+            }
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return (item!=null || latch.getItem()!=null);
+        }
+        
+    }
 
     //------------------------------------------------------------------
     // Count down latch that can be used to exchange information
     //------------------------------------------------------------------
     protected class ExchangeCountDownLatch<T> extends CountDownLatch {
-        protected T item;
+        protected volatile T item;
         public ExchangeCountDownLatch(int i) {
             super(i);
         }

Added: 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java?rev=720641&view=auto
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
 (added)
+++ 
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
 Tue Nov 25 14:57:24 2008
@@ -0,0 +1,66 @@
+package org.apache.tomcat.jdbc.test;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.tomcat.jdbc.pool.FairBlockingQueue;
+
+import junit.framework.TestCase;
+
+public class TestAsyncQueue extends TestCase {
+    protected FairBlockingQueue<Object> queue = null;
+    protected void setUp() throws Exception {
+        super.setUp();
+        this.queue = new FairBlockingQueue<Object>();
+    }
+
+    protected void tearDown() throws Exception {
+        this.queue = null;
+        super.tearDown();
+    }
+    
+    
+    public void testAsyncPoll1() throws Exception {
+        Object item = new Object();
+        queue.offer(item);
+        Future<Object> future = queue.pollAsync();
+        assertEquals(future.get(),item);
+    }
+
+    public void testAsyncPoll2() throws Exception {
+        Object item = new Object();
+        OfferThread thread = new OfferThread(item,5000);
+        thread.start();
+        Future<Object> future = queue.pollAsync();
+        try {
+            future.get(2000, TimeUnit.MILLISECONDS);
+            this.assertFalse("Request should have timed out",true);
+        }catch (TimeoutException x) {
+            this.assertTrue("Request timed out properly",true);
+        }catch (Exception x) {
+            this.assertTrue("Request threw an error",false);
+            x.printStackTrace();
+        }
+        assertEquals(future.get(),item);
+    }
+
+    protected class OfferThread extends Thread {
+        Object item = null;
+        long delay = 5000;
+        volatile boolean offered = false;
+        public OfferThread(Object i, long d) {
+            this.item = i;
+            this.delay = d;
+            this.setDaemon(false);
+            this.setName(TestAsyncQueue.class.getName()+"-OfferThread");
+        }
+        public void run() {
+            try {
+                this.sleep(delay);
+            }catch (Exception ignore){}
+            offered = true;
+            TestAsyncQueue.this.queue.offer(item);
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to