Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java?rev=692371&r1=692370&r2=692371&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java (original) +++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java Fri Sep 5 00:46:53 2008 @@ -19,44 +19,44 @@ package org.apache.cxf.transport.jms; -import javax.jms.Destination; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Calendar; + import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TemporaryQueue; +import javax.jms.TopicSession; /** - * Encapsulates pooled session, unidentified producer, destination & - * associated consumer (certain elements may be null depending on the - * context). + * Encapsulates pooled session, unidentified producer, destination & associated consumer (certain elements may + * be null depending on the context). * <p> - * Currently only the point-to-point domain is supported, - * though the intention is to genericize this to the pub-sub domain - * also. - * + * Currently only the point-to-point domain is supported, though the intention is to genericize this to the + * pub-sub domain also. */ public class PooledSession { - private final Session theSession; - private Destination theDestination; - private final MessageProducer theProducer; + private Session theSession; + private MessageProducer theProducer; private MessageConsumer theConsumer; - + private Queue replyDestination; private String correlationID; + private boolean isQueueStyle; /** * Constructor. */ - PooledSession(Session session, - Destination destination, - MessageProducer producer, - MessageConsumer consumer) { - theSession = session; - theDestination = destination; - theProducer = producer; - theConsumer = consumer; + PooledSession(Session session, boolean isQueueStyle) { + this.theSession = session; + this.isQueueStyle = isQueueStyle; + this.theProducer = null; + this.theConsumer = null; + this.replyDestination = null; } - /** * @return the pooled JMS Session @@ -65,64 +65,83 @@ return theSession; } - - /** - * @return the destination associated with the consumer - */ - Destination destination() { - return theDestination; - } - - - /** - * @param destination the destination to encapsulate - */ - void destination(Destination destination) { - theDestination = destination; - } - - /** * @return the unidentified producer */ MessageProducer producer() { + if (theProducer == null) { + try { + if (isQueueStyle) { + theProducer = ((QueueSession)theSession).createSender(null); + } else { + theProducer = ((TopicSession)theSession).createPublisher(null); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + } return theProducer; } + private String generateUniqueSelector() { + String host = "localhost"; + + try { + InetAddress addr = InetAddress.getLocalHost(); + host = addr.getHostName(); + } catch (UnknownHostException ukex) { + // Default to localhost. + } + + long time = Calendar.getInstance().getTimeInMillis(); + return host + "_" + System.getProperty("user.name") + "_" + this + time; + } + + MessageConsumer consumer() { + return theConsumer; + } /** * @return the per-destination consumer */ - MessageConsumer consumer() { - return theConsumer; + public void initConsumerAndReplyDestination(Queue destination) { + if (!(theSession instanceof QueueSession)) { + throw new RuntimeException("session should be Queuesession expected"); + } + if (theConsumer == null) { + try { + String selector = null; + if (null != destination) { + replyDestination = destination; + selector = "JMSCorrelationID = '" + generateUniqueSelector() + "'"; + } else { + replyDestination = theSession.createTemporaryQueue(); + } + theConsumer = ((QueueSession)theSession).createReceiver(replyDestination, selector); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } } /** * @return messageSelector if any set. */ - - String getCorrelationID() throws JMSException { + + String getCorrelationID() throws JMSException { if (correlationID == null && theConsumer != null) { - //Must be request/reply + // Must be request/reply String selector = theConsumer.getMessageSelector(); - + if (selector != null && selector.startsWith("JMSCorrelationID")) { int i = selector.indexOf('\''); correlationID = selector.substring(i + 1, selector.length() - 1); - } + } } - - return correlationID; - } - /** - * @param consumer the consumer to encapsulate - */ - void consumer(MessageConsumer consumer) { - theConsumer = consumer; + return correlationID; } - void close() throws JMSException { if (theProducer != null) { theProducer.close(); @@ -132,12 +151,16 @@ theConsumer.close(); } - if (theDestination instanceof TemporaryQueue) { - ((TemporaryQueue)theDestination).delete(); + if (replyDestination instanceof TemporaryQueue) { + ((TemporaryQueue)replyDestination).delete(); } if (theSession != null) { theSession.close(); } } + + public Queue getReplyDestination() { + return replyDestination; + } }
Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=692371&r1=692370&r2=692371&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original) +++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Fri Sep 5 00:46:53 2008 @@ -92,13 +92,13 @@ Exchange exchange = new ExchangeImpl(); exchange.setOneWay(isOneWay); message.setExchange(exchange); - exchange.setInMessage(message); + exchange.setOutMessage(message); try { conduit.prepare(message); } catch (IOException ex) { assertFalse("JMSConduit can't perpare to send out message", false); ex.printStackTrace(); - } + } OutputStream os = message.getContent(OutputStream.class); assertTrue("The OutputStream should not be null ", os != null); os.write("HelloWorld".getBytes()); Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=692371&r1=692370&r2=692371&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original) +++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Fri Sep 5 00:46:53 2008 @@ -87,11 +87,8 @@ } public void verifySentMessage(boolean send, Message message) { - PooledSession pooledSession = (PooledSession)message.get(JMSConstants.JMS_POOLEDSESSION); OutputStream os = message.getContent(OutputStream.class); - assertTrue("pooled Session should not be null ", pooledSession != null); - assertTrue("OutputStream should not be null", os != null); - + assertTrue("OutputStream should not be null", os != null); } @Test @@ -139,11 +136,11 @@ JMSConduit conduit = setupJMSConduit(true, false); Message msg = new MessageImpl(); conduit.prepare(msg); - PooledSession sess = conduit.sessionFactory.get(true); + PooledSession sess = conduit.getOrCreateSessionFactory().get(); byte [] b = testMsg.getBytes(); - javax.jms.Message message = JMSUtils.marshal(b, + javax.jms.Message message = JMSUtils.createAndSetPayload(b, sess.session(), - null, JMSConstants.BYTE_MESSAGE_TYPE); + JMSConstants.BYTE_MESSAGE_TYPE); assertTrue("Message should have been of type BytesMessage ", message instanceof BytesMessage); Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=692371&r1=692370&r2=692371&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original) +++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Fri Sep 5 00:46:53 2008 @@ -39,17 +39,17 @@ public class JMSDestinationTest extends AbstractJMSTester { private Message destMessage; - + @BeforeClass public static void createAndStartBroker() throws Exception { startBroker(new JMSBrokerSetup("tcp://localhost:61500")); } - - private void waitForReceiveInMessage() { + + private void waitForReceiveInMessage() { int waitTime = 0; while (inMessage == null && waitTime < 3000) { try { - Thread.sleep(1000); + Thread.sleep(1000); } catch (InterruptedException e) { // do nothing here } @@ -57,12 +57,12 @@ } assertTrue("Can't receive the Conduit Message in 3 seconds", inMessage != null); } - - private void waitForReceiveDestMessage() { + + private void waitForReceiveDestMessage() { int waitTime = 0; while (destMessage == null && waitTime < 3000) { try { - Thread.sleep(1000); + Thread.sleep(1000); } catch (InterruptedException e) { // do nothing here } @@ -70,9 +70,9 @@ } assertTrue("Can't receive the Destination message in 3 seconds", destMessage != null); } - - - + + + public JMSDestination setupJMSDestination(boolean send) throws IOException { ConduitInitiator conduitInitiator = EasyMock.createMock(ConduitInitiator.class); JMSDestination jmsDestination = new JMSDestination(bus, conduitInitiator, endpointInfo); @@ -83,15 +83,15 @@ Exchange exchange = new ExchangeImpl(); exchange.setInMessage(m); m.setExchange(exchange); - destMessage = m; + destMessage = m; } }; jmsDestination.setMessageObserver(observer); } return jmsDestination; } - - @Test + + @Test public void testGetConfigurationFromSpring() throws Exception { SpringBusFactory bf = new SpringBusFactory(); BusFactory.setDefaultBus(null); @@ -121,10 +121,10 @@ "cxf_message_selector", destination.getRuntimePolicy().getMessageSelector()); BusFactory.setDefaultBus(null); - + } - - @Test + + @Test public void testGetConfigurationFormWSDL() throws Exception { SpringBusFactory bf = new SpringBusFactory(); BusFactory.setDefaultBus(null); @@ -134,22 +134,22 @@ "/wsdl/jms_test.wsdl", "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort"); - + JMSDestination destination = setupJMSDestination(false); - + assertEquals("Can't get the right DurableSubscriberName", "CXF_subscriber", destination.getRuntimePolicy().getDurableSubscriberName()); - + assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "dynamicQueues/test.jmstransport.binary", destination.getJMSAddress().getJndiDestinationName()); - + BusFactory.setDefaultBus(null); - + } - - @Test + + @Test public void testDurableSubscriber() throws Exception { SpringBusFactory bf = new SpringBusFactory(); BusFactory.setDefaultBus(null); @@ -157,51 +157,51 @@ BusFactory.setDefaultBus(bus); destMessage = null; inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldPubSubService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldPubSubService", "HelloWorldPubSubPort"); JMSConduit conduit = setupJMSConduit(true, false); Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); JMSDestination destination = null; try { - destination = setupJMSDestination(true); - //destination.activate(); + destination = setupJMSDestination(true); + destination.activate(); } catch (IOException e) { - assertFalse("The JMSDestination activate should not through exception ", false); + assertFalse("The JMSDestination activate should not through exception ", false); e.printStackTrace(); - } - sendoutMessage(conduit, outMessage, true); + } + sendoutMessage(conduit, outMessage, true); // wait for the message to be get from the destination - waitForReceiveDestMessage(); + waitForReceiveDestMessage(); // just verify the Destination inMessage assertTrue("The destiantion should have got the message ", destMessage != null); verifyReceivedMessage(destMessage); verifyHeaders(destMessage, outMessage); destination.shutdown(); } - - @Test + + @Test public void testOneWayDestination() throws Exception { destMessage = null; inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HWStaticReplyQBinMsgService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort"); JMSConduit conduit = setupJMSConduit(true, false); Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); JMSDestination destination = null; try { - destination = setupJMSDestination(true); - //destination.activate(); + destination = setupJMSDestination(true); + destination.activate(); } catch (IOException e) { - assertFalse("The JMSDestination activate should not through exception ", false); + assertFalse("The JMSDestination activate should not throw exception ", false); e.printStackTrace(); - } - sendoutMessage(conduit, outMessage, true); + } + sendoutMessage(conduit, outMessage, true); // wait for the message to be get from the destination waitForReceiveDestMessage(); // just verify the Destination inMessage @@ -210,10 +210,10 @@ verifyHeaders(destMessage, outMessage); destination.shutdown(); } - + private void setupMessageHeader(Message outMessage) { JMSMessageHeadersType header = new JMSMessageHeadersType(); - header.setJMSCorrelationID("Destination test"); + header.setJMSCorrelationID("Destination test"); header.setJMSDeliveryMode(3); header.setJMSPriority(1); header.setTimeToLive(1000); @@ -221,7 +221,7 @@ } private void verifyReceivedMessage(Message inMessage) { - ByteArrayInputStream bis = + ByteArrayInputStream bis = (ByteArrayInputStream) inMessage.getContent(InputStream.class); byte bytes[] = new byte[bis.available()]; try { @@ -233,67 +233,67 @@ String reponse = new String(bytes); assertEquals("The reponse date should be equals", reponse, "HelloWorld"); } - + private void verifyRequestResponseHeaders(Message inMessage, Message outMessage) { JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); - + JMSMessageHeadersType inHeader = - (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); - + (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); + verifyJmsHeaderEquality(outHeader, inHeader); - + } - + private void verifyHeaders(Message inMessage, Message outMessage) { JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); - + JMSMessageHeadersType inHeader = - (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); - + (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); + verifyJmsHeaderEquality(outHeader, inHeader); - + } private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) { - assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals", + assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals", outHeader.getJMSCorrelationID(), inHeader.getJMSCorrelationID()); - assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", + assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader.getJMSPriority(), inHeader.getJMSPriority()); - assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", + assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader.getJMSType(), inHeader.getJMSType()); - + } - - - - @Test + + + + @Test public void testRoundTripDestination() throws Exception { - + inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldService", "HelloWorldPort"); - //set up the conduit send to be true + //set up the conduit send to be true JMSConduit conduit = setupJMSConduit(true, false); final Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); final JMSDestination destination = setupJMSDestination(true); - - //set up MessageObserver for handlering the conduit message + + //set up MessageObserver for handling the conduit message MessageObserver observer = new MessageObserver() { - public void onMessage(Message m) { + public void onMessage(Message m) { Exchange exchange = new ExchangeImpl(); exchange.setInMessage(m); m.setExchange(exchange); verifyReceivedMessage(m); verifyHeaders(m, outMessage); - //setup the message for + //setup the message for Conduit backConduit; try { - backConduit = destination.getBackChannel(m, null, null); + backConduit = destination.getBackChannel(m, null, null); //wait for the message to be got from the conduit Message replyMessage = new MessageImpl(); sendoutMessage(backConduit, replyMessage, true); @@ -303,59 +303,65 @@ } } }; - destination.setMessageObserver(observer); + destination.setMessageObserver(observer); //set is oneway false for get response from destination - sendoutMessage(conduit, outMessage, false); - //wait for the message to be got from the destination, - // create the thread to handler the Destination incomming message - + sendoutMessage(conduit, outMessage, false); + //wait for the message to be got from the destination, + // create the thread to handler the Destination incoming message + waitForReceiveInMessage(); verifyReceivedMessage(inMessage); // wait for a while for the jms session recycling + + // Send a second message to check for an issue + // Where the session was closed the second time + sendoutMessage(conduit, outMessage, false); + waitForReceiveInMessage(); + verifyReceivedMessage(inMessage); + Thread.sleep(1000); destination.shutdown(); } - - @Test + @Test public void testPropertyExclusion() throws Exception { - - final String customPropertyName = + + final String customPropertyName = "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED"; inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldService", "HelloWorldPort"); - //set up the conduit send to be true + //set up the conduit send to be true JMSConduit conduit = setupJMSConduit(true, false); final Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); - + JMSPropertyType excludeProp = new JMSPropertyType(); excludeProp.setName(customPropertyName); excludeProp.setValue(customPropertyName); - + JMSMessageHeadersType headers = (JMSMessageHeadersType) outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); headers.getProperty().add(excludeProp); - + final JMSDestination destination = setupJMSDestination(true); - - //set up MessageObserver for handlering the conduit message + + //set up MessageObserver for handling the conduit message MessageObserver observer = new MessageObserver() { - public void onMessage(Message m) { + public void onMessage(Message m) { Exchange exchange = new ExchangeImpl(); exchange.setInMessage(m); m.setExchange(exchange); verifyReceivedMessage(m); verifyHeaders(m, outMessage); - //setup the message for + //setup the message for Conduit backConduit; try { - backConduit = destination.getBackChannel(m, null, null); + backConduit = destination.getBackChannel(m, null, null); //wait for the message to be got from the conduit Message replyMessage = new MessageImpl(); sendoutMessage(backConduit, replyMessage, true); @@ -365,18 +371,18 @@ } } }; - destination.setMessageObserver(observer); + destination.setMessageObserver(observer); //set is oneway false for get response from destination - sendoutMessage(conduit, outMessage, false); - //wait for the message to be got from the destination, - // create the thread to handler the Destination incomming message - + sendoutMessage(conduit, outMessage, false); + //wait for the message to be got from the destination, + // create the thread to handler the Destination incoming message + waitForReceiveInMessage(); verifyReceivedMessage(inMessage); - - + + verifyRequestResponseHeaders(inMessage, outMessage); - + JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); @@ -390,13 +396,13 @@ Thread.sleep(1000); destination.shutdown(); } - + @Test public void testIsMultiplexCapable() throws Exception { inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldService", "HelloWorldPort"); final JMSDestination destination = setupJMSDestination(true); assertTrue("is multiplex", destination instanceof MultiplexDestination); Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java?rev=692371&r1=692370&r2=692371&view=diff ============================================================================== --- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java (original) +++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java Fri Sep 5 00:46:53 2008 @@ -19,12 +19,6 @@ package org.apache.cxf.transport.jms; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.easymock.classextension.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -33,29 +27,6 @@ @Test public void testPooledSession() throws Exception { - Session sess = EasyMock.createMock(Session.class); - Destination dest = EasyMock.createMock(Destination.class); - MessageProducer mproducer = EasyMock.createMock(MessageProducer.class); - MessageConsumer mconsumer = EasyMock.createMock(MessageConsumer.class); - - PooledSession ps = new PooledSession(sess, dest, mproducer, mconsumer); - - assertTrue(ps.session().equals(sess)); - assertTrue(ps.destination().equals(dest)); - assertTrue(ps.consumer().equals(mconsumer)); - assertTrue(ps.producer().equals(mproducer)); - - MessageConsumer mcons = EasyMock.createMock(MessageConsumer.class); - assertFalse(mconsumer.equals(mcons)); - - ps.consumer(mcons); - - assertTrue(ps.consumer().equals(mcons)); - - Destination mdest = EasyMock.createMock(Destination.class); - assertFalse(dest.equals(mdest)); - - ps.destination(mdest); - assertTrue(mdest.equals(ps.destination())); + // TODO This has to be rewritten as PooledSession now works differently } }
