Author: fhanik
Date: Tue Nov 25 14:57:24 2008
New Revision: 720641
URL: http://svn.apache.org/viewvc?rev=720641&view=rev
Log:
Extend the fair blocking queue to allow asynchronous polling
Added:
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
Modified:
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
Modified:
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java?rev=720641&r1=720640&r2=720641&view=diff
==============================================================================
---
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
(original)
+++
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java
Tue Nov 25 14:57:24 2008
@@ -21,7 +21,10 @@
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -47,6 +50,9 @@
//------------------------------------------------------------------
// USED BY CONPOOL IMPLEMENTATION
//------------------------------------------------------------------
+ /**
+ * [EMAIL PROTECTED]
+ */
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
@@ -65,10 +71,16 @@
return true;
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public boolean offer(E e, long timeout, TimeUnit unit) throws
InterruptedException {
return offer(e);
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E result = null;
final ReentrantLock lock = this.lock;
@@ -97,7 +109,39 @@
}
return result;
}
-
+
+ /**
+ * Request an item from the queue asynchronously
+ * @return - a future pending the result from the queue poll request
+ */
+ public Future<E> pollAsync() {
+ Future<E> result = null;
+ final ReentrantLock lock = this.lock;
+ boolean error = true;
+ lock.lock();
+ try {
+ E item = items.poll();
+ if (item==null) {
+ ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
+ waiters.addLast(c);
+ lock.unlock();
+ result = new ItemFuture(c);
+ } else {
+ lock.unlock();
+ result = new ItemFuture(item);
+ }
+ error = false;
+ } finally {
+ if (error && lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * [EMAIL PROTECTED]
+ */
public boolean remove(Object e) {
final ReentrantLock lock = this.lock;
lock.lock();
@@ -107,15 +151,24 @@
lock.unlock();
}
}
-
+
+ /**
+ * [EMAIL PROTECTED]
+ */
public int size() {
return items.size();
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public Iterator<E> iterator() {
return new FairIterator();
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
@@ -126,6 +179,9 @@
}
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public boolean contains(Object e) {
final ReentrantLock lock = this.lock;
lock.lock();
@@ -140,31 +196,53 @@
//------------------------------------------------------------------
// NOT USED BY CONPOOL IMPLEMENTATION
//------------------------------------------------------------------
-
+ /**
+ * [EMAIL PROTECTED]
+ */
public boolean add(E e) {
return offer(e);
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public int drainTo(Collection<? super E> c, int maxElements) {
throw new UnsupportedOperationException("int drainTo(Collection<?
super E> c, int maxElements)");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public int drainTo(Collection<? super E> c) {
return drainTo(c,Integer.MAX_VALUE);
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public void put(E e) throws InterruptedException {
offer(e);
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public int remainingCapacity() {
return Integer.MAX_VALUE - size();
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public E take() throws InterruptedException {
return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public boolean addAll(Collection<? extends E> c) {
Iterator i = c.iterator();
while (i.hasNext()) {
@@ -174,56 +252,146 @@
return true;
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public void clear() {
throw new UnsupportedOperationException("void clear()");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException("boolean
containsAll(Collection<?> c)");
}
+ /**
+ * [EMAIL PROTECTED]
+ */
public boolean isEmpty() {
return size() == 0;
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("boolean
removeAll(Collection<?> c)");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("boolean
retainAll(Collection<?> c)");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public Object[] toArray() {
throw new UnsupportedOperationException("Object[] toArray()");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public E element() {
throw new UnsupportedOperationException("E element()");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public E peek() {
throw new UnsupportedOperationException("E peek()");
}
+ /**
+ * [EMAIL PROTECTED]
+ * @throws UnsupportedOperation - this operation is not supported
+ */
public E remove() {
throw new UnsupportedOperationException("E remove()");
}
+ //------------------------------------------------------------------
+ // Future used to check and see if a connection has been made available
+ //------------------------------------------------------------------
+ protected class ItemFuture<T> implements Future<T> {
+ protected volatile T item = null;
+ protected volatile ExchangeCountDownLatch<T> latch = null;
+ protected volatile boolean canceled = false;
+
+ public ItemFuture(T item) {
+ this.item = item;
+ }
+
+ public ItemFuture(ExchangeCountDownLatch<T> latch) {
+ this.latch = latch;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false; //don't allow cancel for now
+ }
+ public T get() throws InterruptedException, ExecutionException {
+ if (item!=null) {
+ return item;
+ } else if (latch!=null) {
+ latch.await();
+ return latch.getItem();
+ } else {
+ throw new ExecutionException("ItemFuture incorrectly
instantiated. Bug in the code?", new Exception());
+ }
+ }
+
+ public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
+ if (item!=null) {
+ return item;
+ } else if (latch!=null) {
+ boolean timedout = !latch.await(timeout, unit);
+ if (timedout) throw new TimeoutException();
+ else return latch.getItem();
+ } else {
+ throw new ExecutionException("ItemFuture incorrectly
instantiated. Bug in the code?", new Exception());
+ }
+ }
+
+ public boolean isCancelled() {
+ return false;
+ }
+
+ public boolean isDone() {
+ return (item!=null || latch.getItem()!=null);
+ }
+
+ }
//------------------------------------------------------------------
// Count down latch that can be used to exchange information
//------------------------------------------------------------------
protected class ExchangeCountDownLatch<T> extends CountDownLatch {
- protected T item;
+ protected volatile T item;
public ExchangeCountDownLatch(int i) {
super(i);
}
Added:
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java?rev=720641&view=auto
==============================================================================
---
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
(added)
+++
tomcat/trunk/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java
Tue Nov 25 14:57:24 2008
@@ -0,0 +1,66 @@
+package org.apache.tomcat.jdbc.test;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.tomcat.jdbc.pool.FairBlockingQueue;
+
+import junit.framework.TestCase;
+
+public class TestAsyncQueue extends TestCase {
+ protected FairBlockingQueue<Object> queue = null;
+ protected void setUp() throws Exception {
+ super.setUp();
+ this.queue = new FairBlockingQueue<Object>();
+ }
+
+ protected void tearDown() throws Exception {
+ this.queue = null;
+ super.tearDown();
+ }
+
+
+ public void testAsyncPoll1() throws Exception {
+ Object item = new Object();
+ queue.offer(item);
+ Future<Object> future = queue.pollAsync();
+ assertEquals(future.get(),item);
+ }
+
+ public void testAsyncPoll2() throws Exception {
+ Object item = new Object();
+ OfferThread thread = new OfferThread(item,5000);
+ thread.start();
+ Future<Object> future = queue.pollAsync();
+ try {
+ future.get(2000, TimeUnit.MILLISECONDS);
+ this.assertFalse("Request should have timed out",true);
+ }catch (TimeoutException x) {
+ this.assertTrue("Request timed out properly",true);
+ }catch (Exception x) {
+ this.assertTrue("Request threw an error",false);
+ x.printStackTrace();
+ }
+ assertEquals(future.get(),item);
+ }
+
+ protected class OfferThread extends Thread {
+ Object item = null;
+ long delay = 5000;
+ volatile boolean offered = false;
+ public OfferThread(Object i, long d) {
+ this.item = i;
+ this.delay = d;
+ this.setDaemon(false);
+ this.setName(TestAsyncQueue.class.getName()+"-OfferThread");
+ }
+ public void run() {
+ try {
+ this.sleep(delay);
+ }catch (Exception ignore){}
+ offered = true;
+ TestAsyncQueue.this.queue.offer(item);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]