Author: dkulp Date: Thu Jul 16 19:41:56 2009 New Revision: 794807 URL: http://svn.apache.org/viewvc?rev=794807&view=rev Log: Merged revisions 794799 via svnmerge from https://svn.apache.org/repos/asf/cxf/branches/2.2.x-fixes
................ r794799 | dkulp | 2009-07-16 15:35:27 -0400 (Thu, 16 Jul 2009) | 10 lines Merged revisions 794779 via svnmerge from https://svn.apache.org/repos/asf/cxf/trunk ........ r794779 | dkulp | 2009-07-16 15:02:36 -0400 (Thu, 16 Jul 2009) | 2 lines [CXF-2343] Change jms transport throttling to use a disconnect/reconnect instead of bogus message selector. Patch from Paul Hadrosek applied. ........ ................ Modified: cxf/branches/2.1.x-fixes/ (props changed) cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/jms_test_config.xml Propchange: cxf/branches/2.1.x-fixes/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Jul 16 19:41:56 2009 @@ -1,2 +1,2 @@ -/cxf/branches/2.2.x-fixes:743446,753380,753397,753421,754585,755365,757499,757859,757899,757935,757951,758195,758303,758308,758378,758690,758910,759890,759961,759963-759964,759966,760029,760073,760150,760171,760178,760198,760212,760456,760468,760582,760938,761094,761113,761120,761317,761759,761789,762393,762518,762567,763200,763272,763495,763854,763931,763942,763953,764033-764034,764581,764599-764606,764887,765357,766013,766058,766100-766101,766763,766770,766860,766962-766963,767159,767191,767927,771416,772143,772402,772658,772714,773009-773010,773027,773049,773146,773581,773691,773693,774446-774496,774558,774760,774851,774979,775423,776024-776025,776218,776429,776459,777189,777224,777243,777481,777505,777572,777580,780033,780184,780213,780421,780664,780800,780902,780911,781497,781841,782733,782735-782736,783099,783407,784064,784197,785293,785296,785298-785299,785301,785656,786158,786587,786589,786591-786592,786640,787272,787276,787282-787283,787285,787295,787307,787324,7873 67,788824-788825,788827-788828,788830,789423,789429,789707,789709-789710,789712,789721,789905,789908,789910,789912,790295,790646-790647,790651,790654-790655,790659,791948,791950,791952,791955,792276,792288,792291,792992,792995,792998,794402,794404,794735,794797 -/cxf/trunk:782181,782728-782730,783097,783396,784059,784181,784895,785279-785282,785468,786142,786271,786395,786582-786583,786638,786647,787269,787277-787279,787290,787305,787323,787366,788060,788187,788703,788774,788820,789371,789420,789527-789529,789704-789705,789896,789898-789900,790294,790637-790644,791354,791538,791753,791947,792261-792263,792684,792975,792985,794297,794396,794728,794778 +/cxf/branches/2.2.x-fixes:743446,753380,753397,753421,754585,755365,757499,757859,757899,757935,757951,758195,758303,758308,758378,758690,758910,759890,759961,759963-759964,759966,760029,760073,760150,760171,760178,760198,760212,760456,760468,760582,760938,761094,761113,761120,761317,761759,761789,762393,762518,762567,763200,763272,763495,763854,763931,763942,763953,764033-764034,764581,764599-764606,764887,765357,766013,766058,766100-766101,766763,766770,766860,766962-766963,767159,767191,767927,771416,772143,772402,772658,772714,773009-773010,773027,773049,773146,773581,773691,773693,774446-774496,774558,774760,774851,774979,775423,776024-776025,776218,776429,776459,777189,777224,777243,777481,777505,777572,777580,780033,780184,780213,780421,780664,780800,780902,780911,781497,781841,782733,782735-782736,783099,783407,784064,784197,785293,785296,785298-785299,785301,785656,786158,786587,786589,786591-786592,786640,787272,787276,787282-787283,787285,787295,787307,787324,7873 67,788824-788825,788827-788828,788830,789423,789429,789707,789709-789710,789712,789721,789905,789908,789910,789912,790295,790646-790647,790651,790654-790655,790659,791948,791950,791952,791955,792276,792288,792291,792992,792995,792998,794402,794404,794735,794797,794799 +/cxf/trunk:782181,782728-782730,783097,783396,784059,784181,784895,785279-785282,785468,786142,786271,786395,786582-786583,786638,786647,787269,787277-787279,787290,787305,787323,787366,788060,788187,788703,788774,788820,789371,789420,789527-789529,789704-789705,789896,789898-789900,790294,790637-790644,791354,791538,791753,791947,792261-792263,792684,792975,792985,794297,794396,794728,794778-794779 Propchange: cxf/branches/2.1.x-fixes/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=794807&r1=794806&r2=794807&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Thu Jul 16 19:41:56 2009 @@ -81,7 +81,8 @@ private int cacheLevel = DEFAULT_VALUE; private String cacheLevelName; private boolean enforceSpec = true; - + private boolean acceptMessagesWhileStopping; + private ConnectionFactory wrappedConnectionFactory; private JNDIConfiguration jndiConfig; @@ -381,7 +382,15 @@ public void setReconnectOnException(boolean reconnectOnException) { this.reconnectOnException = reconnectOnException; } - + + public boolean isAcceptMessagesWhileStopping() { + return acceptMessagesWhileStopping; + } + + public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { + this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; + } + /** * Tries to creates a ConnectionFactory from jndi if none was set as a property * by using the jndConfig. Then it determiens if the connectionFactory should be wrapped Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=794807&r1=794806&r2=794807&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Thu Jul 16 19:41:56 2009 @@ -159,6 +159,9 @@ + ", please set cacheLevel to the value less than " + " org.springframework.jms.listener.DefaultMessageListenerContainer.CACHE_CONSUMER"); } + if (jmsConfig.isAcceptMessagesWhileStopping()) { + jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping()); + } String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix(); if (!userCID && messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) { jmsListener.setMessageSelector("JMSCorrelationID LIKE '" Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=794807&r1=794806&r2=794807&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Thu Jul 16 19:41:56 2009 @@ -33,8 +33,6 @@ import org.springframework.jms.listener.DefaultMessageListenerContainer; public class JMSContinuation implements Continuation { - - static final String BOGUS_MESSAGE_SELECTOR = "orgApacheCxfTransportsJmsContinuations='too-many'"; private Bus bus; private Message inMessage; @@ -43,8 +41,6 @@ private DefaultMessageListenerContainer jmsListener; private JMSConfiguration jmsConfig; - private String currentMessageSelector = BOGUS_MESSAGE_SELECTOR; - private Object userObject; private boolean isNew = true; @@ -163,14 +159,11 @@ // throttle the flow if there're too many continuation instances in memory synchronized (continuations) { modifyList(remove); - if (remove && !BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) { - jmsListener.setMessageSelector(currentMessageSelector); - currentMessageSelector = BOGUS_MESSAGE_SELECTOR; - } else if (!remove && continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) { - currentMessageSelector = jmsListener.getMessageSelector(); - if (!BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) { - jmsListener.setMessageSelector(BOGUS_MESSAGE_SELECTOR); - + if (continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) { + jmsListener.stop(); + } else { + if (!jmsListener.isRunning()) { + jmsListener.start(); } } } @@ -185,9 +178,6 @@ } } - String getCurrentMessageSelector() { - return currentMessageSelector; - } } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=794807&r1=794806&r2=794807&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Thu Jul 16 19:41:56 2009 @@ -130,6 +130,8 @@ assertEquals("The maxConcurrentConsumer should be set", jmsConfig.getMaxConcurrentConsumers(), 5); assertEquals("The maxSuspendedContinuations should be set", jmsConfig.getMaxSuspendedContinuations(), 2); + assertTrue("The acceptMessagesWhileStopping should be set to true", + jmsConfig.isAcceptMessagesWhileStopping()); assertNotNull("The connectionFactory should not be null", jmsConfig.getConnectionFactory()); assertTrue("Should get the instance of ActiveMQConnectionFactory", jmsConfig.getConnectionFactory() instanceof ActiveMQConnectionFactory); Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=794807&r1=794806&r2=794807&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Thu Jul 16 19:41:56 2009 @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.springframework.jms.JmsException; import org.springframework.jms.listener.DefaultMessageListenerContainer; @@ -99,9 +100,9 @@ } @Test - public void testThrottleWithMessageSelector() { + public void testThrottleWithJmsStartAndStop() { - DefaultMessageListenerContainer springContainer = new DefaultMessageListenerContainer(); + DefaultMessageListenerContainerStub springContainer = new DefaultMessageListenerContainerStub(); springContainer.setCacheLevel(2); JMSConfiguration config = new JMSConfiguration(); config.setMaxSuspendedContinuations(1); @@ -110,17 +111,17 @@ new TestJMSContinuationWrapper(b, m, observer, continuations, springContainer, config); - assertNull(springContainer.getMessageSelector()); - assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector()); + assertFalse(springContainer.isStart()); + assertFalse(springContainer.isStop()); - suspendResumeCheckSelector(cw, springContainer); + suspendResumeCheckStartAndStop(cw, config, springContainer); EasyMock.reset(observer); - suspendResumeCheckSelector(cw, springContainer); + suspendResumeCheckStartAndStop(cw, config, springContainer); } - private void suspendResumeCheckSelector(JMSContinuation cw, - DefaultMessageListenerContainer springContainer) { + private void suspendResumeCheckStartAndStop(JMSContinuation cw, JMSConfiguration config, + DefaultMessageListenerContainerStub springContainer) { try { cw.suspend(5000); fail("SuspendInvocation exception expected"); @@ -129,12 +130,10 @@ } assertEquals(continuations.size(), 1); assertSame(continuations.get(0), cw); + assertTrue(springContainer.isStop()); assertFalse(cw.suspend(1000)); - assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, springContainer.getMessageSelector()); - assertNull(cw.getCurrentMessageSelector()); - observer.onMessage(m); EasyMock.expectLastCall(); EasyMock.replay(observer); @@ -142,10 +141,8 @@ cw.resume(); assertEquals(continuations.size(), 0); + assertTrue(springContainer.isStart()); EasyMock.verify(observer); - - assertNull(springContainer.getMessageSelector()); - assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector()); } @Test @@ -191,4 +188,27 @@ return result; } } + + private class DefaultMessageListenerContainerStub extends DefaultMessageListenerContainer { + private boolean start; + private boolean stop; + + public void start() throws JmsException { + this.start = true; + this.stop = false; + } + + public void stop() throws JmsException { + this.stop = true; + this.start = false; + } + + public boolean isStart() { + return this.start; + } + + public boolean isStop() { + return this.stop; + } + } } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/jms_test_config.xml URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/jms_test_config.xml?rev=794807&r1=794806&r2=794807&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/jms_test_config.xml (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/jms_test_config.xml Thu Jul 16 19:41:56 2009 @@ -109,6 +109,7 @@ p:concurrentConsumers="3" p:maxConcurrentConsumers="5" p:maxSuspendedContinuations="2" + p:acceptMessagesWhileStopping="true" /> <!--
