Author: dkulp Date: Fri Aug 19 17:41:13 2011 New Revision: 1159705 URL: http://svn.apache.org/viewvc?rev=1159705&view=rev Log: Merged revisions 1159696 via svnmerge from https://svn.us.apache.org/repos/asf/cxf/branches/2.4.x-fixes
................ r1159696 | dkulp | 2011-08-19 13:20:59 -0400 (Fri, 19 Aug 2011) | 14 lines Merged revisions 1159695 via svnmerge from https://svn.apache.org/repos/asf/cxf/trunk ........ r1159695 | dkulp | 2011-08-19 13:18:56 -0400 (Fri, 19 Aug 2011) | 6 lines [CXF-3750] Fix problem with CXF could lockup with a lot of long running one-way operations. Fix problems with OneWay and decoupled ws-addr not buffering the incoming stream properly Fix AutomaticWorkqueue to actually create threads as needed PRIOR to the queue filling completely up. ........ ................ Modified: cxf/branches/2.3.x-fixes/ (props changed) cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java cxf/branches/2.3.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java cxf/branches/2.3.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Propchange: cxf/branches/2.3.x-fixes/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java?rev=1159705&r1=1159704&r2=1159705&view=diff ============================================================================== --- cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java (original) +++ cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/OneWayProcessorInterceptor.java Fri Aug 19 17:41:13 2011 @@ -76,7 +76,7 @@ public class OneWayProcessorInterceptor //need to suck in all the data from the input stream as //the transport might discard any data on the stream when this //thread unwinds or when the empty response is sent back - DelegatingInputStream in = message.get(DelegatingInputStream.class); + DelegatingInputStream in = message.getContent(DelegatingInputStream.class); if (in != null) { in.cacheInput(); } @@ -102,12 +102,13 @@ public class OneWayProcessorInterceptor if (Boolean.FALSE.equals(o)) { chain.pause(); try { - synchronized (chain) { + final Object lock = new Object(); + synchronized (lock) { message.getExchange().get(Bus.class).getExtension(WorkQueueManager.class) .getAutomaticWorkQueue().execute(new Runnable() { public void run() { - synchronized (chain) { - chain.notifyAll(); + synchronized (lock) { + lock.notifyAll(); } chain.resume(); } @@ -115,7 +116,7 @@ public class OneWayProcessorInterceptor //wait a few milliseconds for the background thread to start processing //Mostly just to make an attempt at keeping the ordering of the //messages coming in from a client. Not guaranteed though. - chain.wait(20); + lock.wait(20); } } catch (RejectedExecutionException e) { //the executor queue is full, so run the task in the caller thread Modified: cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java?rev=1159705&r1=1159704&r2=1159705&view=diff ============================================================================== --- cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java (original) +++ cxf/branches/2.3.x-fixes/rt/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java Fri Aug 19 17:41:13 2011 @@ -19,6 +19,8 @@ package org.apache.cxf.workqueue; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.concurrent.DelayQueue; @@ -30,6 +32,7 @@ import java.util.concurrent.ThreadPoolEx import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -52,6 +55,9 @@ public class AutomaticWorkQueueImpl exte WorkQueueManagerImpl manager; String name = "default"; + final int corePoolSize; + final int maxPoolSize; + final ReentrantLock mainLock; public AutomaticWorkQueueImpl() { this(DEFAULT_MAX_QUEUE_SIZE); @@ -125,6 +131,19 @@ public class AutomaticWorkQueueImpl exte // start the watch dog thread watchDog.setDaemon(true); watchDog.start(); + + corePoolSize = this.getCorePoolSize(); + maxPoolSize = this.getMaximumPoolSize(); + + ReentrantLock l = null; + try { + Field f = ThreadPoolExecutor.class.getDeclaredField("mainLock"); + f.setAccessible(true); + l = (ReentrantLock)f.get(this); + } catch (Throwable t) { + l = new ReentrantLock(); + } + mainLock = l; } private static ThreadFactory createThreadFactory(final String name) { ThreadGroup group; @@ -337,7 +356,33 @@ public class AutomaticWorkQueueImpl exte } } }; + //The ThreadPoolExecutor in the JDK doesn't expand the number + //of threads until the queue is full. However, we would + //prefer the number of threads to expand immediately and + //only uses the queue if we've reached the maximum number + //of threads. Thus, we'll set the core size to the max, + //add the runnable, and set back. That will cause the + //threads to be created as needed. super.execute(r); + if (!getQueue().isEmpty() && this.getPoolSize() < maxPoolSize) { + mainLock.lock(); + try { + int ps = this.getPoolSize(); + int sz = getQueue().size(); + int sz2 = this.getActiveCount(); + + if ((sz + sz2) > ps) { + Method m = ThreadPoolExecutor.class.getDeclaredMethod("addIfUnderMaximumPoolSize", + Runnable.class); + m.setAccessible(true); + m.invoke(this, new Object[1]); + } + } catch (Exception ex) { + //ignore + } finally { + mainLock.unlock(); + } + } } // WorkQueue interface Modified: cxf/branches/2.3.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java?rev=1159705&r1=1159704&r2=1159705&view=diff ============================================================================== --- cxf/branches/2.3.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java (original) +++ cxf/branches/2.3.x-fixes/rt/core/src/test/java/org/apache/cxf/workqueue/AutomaticWorkQueueTest.java Fri Aug 19 17:41:13 2011 @@ -281,7 +281,7 @@ public class AutomaticWorkQueueTest exte // Give threads a chance to dequeue (5sec max) int i = 0; - while (workqueue.getPoolSize() != 10 && i++ < 50) { + while (workqueue.getPoolSize() > 10 && i++ < 50) { try { Thread.sleep(100); } catch (InterruptedException ie) { @@ -300,7 +300,7 @@ public class AutomaticWorkQueueTest exte } @Test - public void testThreadPoolShrinkUnbounded() { + public void testThreadPoolShrinkUnbounded() throws Exception { workqueue = new AutomaticWorkQueueImpl(UNBOUNDED_MAX_QUEUE_SIZE, INITIAL_SIZE, UNBOUNDED_HIGH_WATER_MARK, DEFAULT_LOW_WATER_MARK, 100L); @@ -311,18 +311,15 @@ public class AutomaticWorkQueueTest exte // Give threads a chance to dequeue (5sec max) int i = 0; int last = workqueue.getPoolSize(); - while (workqueue.getPoolSize() != DEFAULT_LOW_WATER_MARK && i++ < 50) { + while (workqueue.getPoolSize() > DEFAULT_LOW_WATER_MARK && i++ < 50) { if (last != workqueue.getPoolSize()) { last = workqueue.getPoolSize(); i = 0; } - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - // ignore - } + Thread.sleep(100); } - assertTrue("threads_total()", workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK); + int sz = workqueue.getPoolSize(); + assertTrue("threads_total(): " + sz, workqueue.getPoolSize() <= DEFAULT_LOW_WATER_MARK); } @Test Modified: cxf/branches/2.3.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java?rev=1159705&r1=1159704&r2=1159705&view=diff ============================================================================== --- cxf/branches/2.3.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java (original) +++ cxf/branches/2.3.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Fri Aug 19 17:41:13 2011 @@ -429,7 +429,7 @@ public final class ContextUtils { //need to suck in all the data from the input stream as //the transport might discard any data on the stream when this //thread unwinds or when the empty response is sent back - DelegatingInputStream in = inMessage.get(DelegatingInputStream.class); + DelegatingInputStream in = inMessage.getContent(DelegatingInputStream.class); if (in != null) { in.cacheInput(); }
