Author: ningjiang
Date: Wed Apr 20 10:30:44 2011
New Revision: 1095349
URL: http://svn.apache.org/viewvc?rev=1095349&view=rev
Log:
CXF-3464 AutomaticWorkQueueImpl uses a DelayQueue to accept the tasks which is
delayed
Modified:
cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
Modified:
cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1095349&r1=1095348&r2=1095349&view=diff
==============================================================================
---
cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
(original)
+++
cxf/trunk/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java
Wed Apr 20 10:30:44 2011
@@ -21,11 +21,14 @@ package org.apache.cxf.workqueue;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -44,6 +47,8 @@ public class AutomaticWorkQueueImpl exte
LogUtils.getL7dLogger(AutomaticWorkQueueImpl.class);
int maxQueueSize;
+ DelayQueue<DelayedTaskWrapper> delayQueue = new
DelayQueue<DelayedTaskWrapper>();
+ WatchDog watchDog = new WatchDog(delayQueue);
WorkQueueManagerImpl manager;
String name = "default";
@@ -116,6 +121,10 @@ public class AutomaticWorkQueueImpl exte
}
setCorePoolSize(lowWaterMark);
}
+
+ // start the watch dog thread
+ watchDog.setDaemon(true);
+ watchDog.start();
}
private static ThreadFactory createThreadFactory(final String name) {
ThreadGroup group;
@@ -146,17 +155,87 @@ public class AutomaticWorkQueueImpl exte
}
return new AWQThreadFactory(group, name);
}
+
+ static class DelayedTaskWrapper implements Delayed, Runnable {
+ long trigger;
+ Runnable work;
+
+ DelayedTaskWrapper(Runnable work, long delay) {
+ this.work = work;
+ trigger = System.currentTimeMillis() + delay;
+ }
+
+ public long getDelay(TimeUnit unit) {
+ long n = trigger - System.currentTimeMillis();
+ return unit.convert(n, TimeUnit.MILLISECONDS);
+ }
+
+ public int compareTo(Delayed delayed) {
+ long other = ((DelayedTaskWrapper)delayed).trigger;
+ int returnValue;
+ if (this.trigger < other) {
+ returnValue = -1;
+ } else if (this.trigger > other) {
+ returnValue = 1;
+ } else {
+ returnValue = 0;
+ }
+ return returnValue;
+ }
+
+ public void run() {
+ work.run();
+ }
+
+ }
+
+ class WatchDog extends Thread {
+ DelayQueue<DelayedTaskWrapper> delayQueue;
+ AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ WatchDog(DelayQueue<DelayedTaskWrapper> queue) {
+ delayQueue = queue;
+ }
+
+ public void shutdown() {
+ shutdown.set(true);
+ }
+
+ public void run() {
+ DelayedTaskWrapper task;
+ try {
+ while (!shutdown.get()) {
+ task = delayQueue.take();
+ if (task != null) {
+ try {
+ execute(task);
+ } catch (Exception ex) {
+ LOG.warning("Executing the task from DelayQueue
with exception: " + ex);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.finer("The DelayQueue watchdog Task is stopping");
+ }
+ }
+
+ }
+
+ }
static class AWQThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
ThreadGroup group;
String name;
ClassLoader loader;
+
AWQThreadFactory(ThreadGroup gp, String nm) {
group = gp;
name = nm;
//force the loader to be the loader of CXF, not the application
loader
loader = AutomaticWorkQueueImpl.class.getClassLoader();
}
+
public Thread newThread(Runnable r) {
if (group.isDestroyed()) {
group = new ThreadGroup(group.getParent(), name +
"-workqueue");
@@ -273,18 +352,7 @@ public class AutomaticWorkQueueImpl exte
}
public void schedule(final Runnable work, final long delay) {
- // temporary implementation, replace with shared long-lived scheduler
- // task
- execute(new Runnable() {
- public void run() {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ie) {
- // ignore
- }
- work.run();
- }
- });
+ delayQueue.put(new DelayedTaskWrapper(work, delay));
}
// AutomaticWorkQueue interface
@@ -301,6 +369,7 @@ public class AutomaticWorkQueueImpl exte
if (f instanceof AWQThreadFactory) {
((AWQThreadFactory)f).shutdown();
}
+ watchDog.shutdown();
}
/**