Author: dkulp Date: Thu Jul 12 14:28:42 2012 New Revision: 1360694 URL: http://svn.apache.org/viewvc?rev=1360694&view=rev Log: Merged revisions 1360672 via git cherry-pick from https://svn.apache.org/repos/asf/cxf/branches/2.6.x-fixes
........ r1360672 | dkulp | 2012-07-12 09:47:24 -0400 (Thu, 12 Jul 2012) | 10 lines Merged revisions 1360401 via git cherry-pick from https://svn.apache.org/repos/asf/cxf/trunk ........ r1360401 | dkulp | 2012-07-11 17:15:28 -0400 (Wed, 11 Jul 2012) | 2 lines Updates to workqueue to work better with Java7 ........ ........ Modified: cxf/branches/2.5.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Modified: cxf/branches/2.5.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1360694&r1=1360693&r2=1360694&view=diff ============================================================================== --- cxf/branches/2.5.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original) +++ cxf/branches/2.5.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Thu Jul 12 14:28:42 2012 @@ -63,8 +63,12 @@ public class AutomaticWorkQueueImpl impl int lowWaterMark; int highWaterMark; long dequeueTimeout; + volatile int approxThreadCount; ThreadPoolExecutor executor; + Method addWorkerMethod; + Object addWorkerArgs[]; + AWQThreadFactory threadFactory; ReentrantLock mainLock; @@ -204,10 +208,27 @@ public class AutomaticWorkQueueImpl impl l = new ReentrantLock(); } mainLock = l; + + try { + //java 5/6 + addWorkerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize", + Runnable.class); + addWorkerArgs = new Object[] {null}; + } catch (Throwable t) { + try { + //java 7 + addWorkerMethod = ThreadPoolExecutor.class.getDeclaredMethod("addWorker", + Runnable.class, Boolean.TYPE); + addWorkerArgs = new Object[] {null, Boolean.FALSE}; + } catch (Throwable t2) { + //nothing we cando + } + } + } return executor; } - private static AWQThreadFactory createThreadFactory(final String name) { + private AWQThreadFactory createThreadFactory(final String nm) { ThreadGroup group; try { //Try and find the highest level ThreadGroup that we're allowed to use. @@ -227,14 +248,14 @@ public class AutomaticWorkQueueImpl impl //ignore - if we get here, the "group" is as high as //the security manager will allow us to go. Use that one. } - return new ThreadGroup(group, name + "-workqueue"); + return new ThreadGroup(group, nm + "-workqueue"); } } ); } catch (SecurityException e) { - group = new ThreadGroup(name + "-workqueue"); + group = new ThreadGroup(nm + "-workqueue"); } - return new AWQThreadFactory(group, name); + return new AWQThreadFactory(group, nm); } static class DelayedTaskWrapper implements Delayed, Runnable { @@ -306,7 +327,7 @@ public class AutomaticWorkQueueImpl impl } } - static class AWQThreadFactory implements ThreadFactory { + class AWQThreadFactory implements ThreadFactory { final AtomicInteger threadNumber = new AtomicInteger(1); ThreadGroup group; String name; @@ -319,12 +340,22 @@ public class AutomaticWorkQueueImpl impl loader = AutomaticWorkQueueImpl.class.getClassLoader(); } - public Thread newThread(Runnable r) { + public Thread newThread(final Runnable r) { if (group.isDestroyed()) { group = new ThreadGroup(group.getParent(), name + "-workqueue"); } + Runnable wrapped = new Runnable() { + public void run() { + ++approxThreadCount; + try { + r.run(); + } finally { + --approxThreadCount; + } + } + }; final Thread t = new Thread(group, - r, + wrapped, name + "-workqueue-" + threadNumber.getAndIncrement(), 0); AccessController.doPrivileged(new PrivilegedAction<Boolean>() { @@ -404,12 +435,12 @@ public class AutomaticWorkQueueImpl impl //of threads until the queue is full. However, we would //prefer the number of threads to expand immediately and //only uses the queue if we've reached the maximum number - //of threads. Thus, we'll set the core size to the max, - //add the runnable, and set back. That will cause the - //threads to be created as needed. + //of threads. ThreadPoolExecutor ex = getExecutor(); ex.execute(r); - if (!ex.getQueue().isEmpty() && this.getPoolSize() < highWaterMark) { + if (addWorkerMethod != null + && !ex.getQueue().isEmpty() + && this.approxThreadCount < highWaterMark) { mainLock.lock(); try { int ps = this.getPoolSize(); @@ -417,9 +448,7 @@ public class AutomaticWorkQueueImpl impl int sz2 = this.getActiveCount(); if ((sz + sz2) > ps) { - Method m = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize", - Runnable.class); - ReflectionUtil.setAccessible(m).invoke(executor, new Object[1]); + ReflectionUtil.setAccessible(addWorkerMethod).invoke(executor, addWorkerArgs); } } catch (Exception exc) { //ignore
