Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=692329&r1=692328&r2=692329&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Thu Sep 4 20:38:07 2008 @@ -30,12 +30,18 @@ import java.util.logging.Logger; import javax.jms.BytesMessage; +import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageProducer; import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueSender; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; @@ -136,7 +142,7 @@ * @param replyTo the ReplyTo destination if any * @return a JMS of the appropriate type populated with the given payload */ - public static Message marshal(Object payload, Session session, Destination replyTo, String messageType) + public static Message createAndSetPayload(Object payload, Session session, String messageType) throws JMSException { Message message = null; @@ -150,32 +156,26 @@ ((ObjectMessage)message).setObject((byte[])payload); } - if (replyTo != null) { - message.setJMSReplyTo(replyTo); - } - return message; } /** - * Unmarshal the payload of an incoming message. + * Extract the payload of an incoming message. * * @param message the incoming message - * @return the unmarshalled message payload, either of type String or byte[] depending on payload type + * @return the message payload as byte[] */ - public static Object unmarshal(Message message) throws JMSException { - Object ret = null; + public static byte[] retrievePayload(Message message) throws JMSException { + byte[] ret = null; if (message instanceof TextMessage) { - ret = ((TextMessage)message).getText(); + ret = ((TextMessage)message).getText().getBytes(); } else if (message instanceof BytesMessage) { - byte[] retBytes = new byte[(int)((BytesMessage)message).getBodyLength()]; - ((BytesMessage)message).readBytes(retBytes); - ret = retBytes; + ret = new byte[(int)((BytesMessage)message).getBodyLength()]; + ((BytesMessage)message).readBytes(ret); } else { ret = (byte[])((ObjectMessage)message).getObject(); } - return ret; } @@ -251,7 +251,7 @@ return headers; } - public static void setContentToProtocalHeader(org.apache.cxf.message.Message message) { + public static void setContentToProtocolHeader(org.apache.cxf.message.Message message) { String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE); Map<String, List<String>> headers = JMSUtils.getSetProtocolHeaders(message); @@ -268,4 +268,107 @@ public static boolean isDestinationStyleQueue(AddressType address) { return JMSConstants.JMS_QUEUE.equals(address.getDestinationStyle().value()); } + + public static Message buildJMSMessageFromCXFMessage(org.apache.cxf.message.Message outMessage, + Object payload, String messageType, Session session, + Destination replyTo, String correlationId) + throws JMSException { + Message jmsMessage = JMSUtils.createAndSetPayload(payload, session, messageType); + + if (replyTo != null) { + jmsMessage.setJMSReplyTo(replyTo); + } + + JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage + .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); + + String correlationID = JMSUtils.getCorrelationId(headers); + + JMSUtils.setMessageProperties(headers, jmsMessage); + // ensure that the contentType is set to the out jms message header + JMSUtils.setContentToProtocolHeader(outMessage); + Map<String, List<String>> protHeaders = CastUtils.cast((Map<?, ?>)outMessage + .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS)); + JMSUtils.addProtocolHeaders(jmsMessage, protHeaders); + if (!outMessage.getExchange().isOneWay()) { + String id = correlationId; + + if (id != null) { + if (correlationID != null) { + String error = "User cannot set JMSCorrelationID when " + + "making a request/reply invocation using " + "a static replyTo Queue."; + throw new JMSException(error); + } + correlationID = id; + } + } + + if (correlationID != null) { + jmsMessage.setJMSCorrelationID(correlationID); + } else { + // No message correlation id is set. Whatever comeback will be accepted as responses. + // We assume that it will only happen in case of the temp. reply queue. + } + return jmsMessage; + } + + public static void sendMessage(MessageProducer producer, Destination destination, Message jmsMessage, + long timeToLive, int deliveryMode, int priority) throws JMSException { + /* + * Can this be changed to producer.send(destination, jmsMessage, deliveryMode, priority, timeToLive); + */ + + if (destination instanceof Queue) { + QueueSender sender = (QueueSender)producer; + sender.setTimeToLive(timeToLive); + sender.send((Queue)destination, jmsMessage, deliveryMode, priority, timeToLive); + } else { + TopicPublisher publisher = (TopicPublisher)producer; + publisher.setTimeToLive(timeToLive); + publisher.publish((Topic)destination, jmsMessage, deliveryMode, priority, timeToLive); + } + } + + public static Destination resolveRequestDestination(Context context, Connection connection, + AddressType addrDetails) throws JMSException, + NamingException { + Destination requestDestination = null; + // see if jndiDestination is set + if (addrDetails.getJndiDestinationName() != null) { + requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName()); + } + + // if no jndiDestination or it fails see if jmsDestination is set + // and try to create it. + if (requestDestination == null && addrDetails.getJmsDestinationName() != null) { + if (JMSUtils.isDestinationStyleQueue(addrDetails)) { + requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + .createQueue(addrDetails.getJmsDestinationName()); + } else { + requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + .createTopic(addrDetails.getJmsDestinationName()); + } + } + return requestDestination; + } + + public static Queue resolveReplyDestination(Context context, Connection connection, + AddressType addrDetails) throws NamingException, + JMSException { + Queue replyDestination = null; + + // Reply Destination is used (if present) only if the session is + // point-to-point session + if (JMSUtils.isDestinationStyleQueue(addrDetails)) { + if (addrDetails.getJndiReplyDestinationName() != null) { + replyDestination = (Queue)context.lookup(addrDetails.getJndiReplyDestinationName()); + } + if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + replyDestination = session.createQueue(addrDetails.getJmsReplyDestinationName()); + session.close(); + } + } + return replyDestination; + } }
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java?rev=692329&r1=692328&r2=692329&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java Thu Sep 4 20:38:07 2008 @@ -19,44 +19,44 @@ package org.apache.cxf.transport.jms; -import javax.jms.Destination; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Calendar; + import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TemporaryQueue; +import javax.jms.TopicSession; /** - * Encapsulates pooled session, unidentified producer, destination & - * associated consumer (certain elements may be null depending on the - * context). + * Encapsulates pooled session, unidentified producer, destination & associated consumer (certain elements may + * be null depending on the context). * <p> - * Currently only the point-to-point domain is supported, - * though the intention is to genericize this to the pub-sub domain - * also. - * + * Currently only the point-to-point domain is supported, though the intention is to genericize this to the + * pub-sub domain also. */ public class PooledSession { - private final Session theSession; - private Destination theDestination; - private final MessageProducer theProducer; + private Session theSession; + private MessageProducer theProducer; private MessageConsumer theConsumer; - + private Queue replyDestination; private String correlationID; + private boolean isQueueStyle; /** * Constructor. */ - PooledSession(Session session, - Destination destination, - MessageProducer producer, - MessageConsumer consumer) { - theSession = session; - theDestination = destination; - theProducer = producer; - theConsumer = consumer; + PooledSession(Session session, boolean isQueueStyle) { + this.theSession = session; + this.isQueueStyle = isQueueStyle; + this.theProducer = null; + this.theConsumer = null; + this.replyDestination = null; } - /** * @return the pooled JMS Session @@ -65,64 +65,83 @@ return theSession; } - - /** - * @return the destination associated with the consumer - */ - Destination destination() { - return theDestination; - } - - - /** - * @param destination the destination to encapsulate - */ - void destination(Destination destination) { - theDestination = destination; - } - - /** * @return the unidentified producer */ MessageProducer producer() { + if (theProducer == null) { + try { + if (isQueueStyle) { + theProducer = ((QueueSession)theSession).createSender(null); + } else { + theProducer = ((TopicSession)theSession).createPublisher(null); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + } return theProducer; } + private String generateUniqueSelector() { + String host = "localhost"; + + try { + InetAddress addr = InetAddress.getLocalHost(); + host = addr.getHostName(); + } catch (UnknownHostException ukex) { + // Default to localhost. + } + + long time = Calendar.getInstance().getTimeInMillis(); + return host + "_" + System.getProperty("user.name") + "_" + this + time; + } + + MessageConsumer consumer() { + return theConsumer; + } /** * @return the per-destination consumer */ - MessageConsumer consumer() { - return theConsumer; + public void initConsumerAndReplyDestination(Queue destination) { + if (!(theSession instanceof QueueSession)) { + throw new RuntimeException("session should be Queuesession expected"); + } + if (theConsumer == null) { + try { + String selector = null; + if (null != destination) { + replyDestination = destination; + selector = "JMSCorrelationID = '" + generateUniqueSelector() + "'"; + } else { + replyDestination = theSession.createTemporaryQueue(); + } + theConsumer = ((QueueSession)theSession).createReceiver(replyDestination, selector); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } } /** * @return messageSelector if any set. */ - - String getCorrelationID() throws JMSException { + + String getCorrelationID() throws JMSException { if (correlationID == null && theConsumer != null) { - //Must be request/reply + // Must be request/reply String selector = theConsumer.getMessageSelector(); - + if (selector != null && selector.startsWith("JMSCorrelationID")) { int i = selector.indexOf('\''); correlationID = selector.substring(i + 1, selector.length() - 1); - } + } } - - return correlationID; - } - /** - * @param consumer the consumer to encapsulate - */ - void consumer(MessageConsumer consumer) { - theConsumer = consumer; + return correlationID; } - void close() throws JMSException { if (theProducer != null) { theProducer.close(); @@ -132,12 +151,16 @@ theConsumer.close(); } - if (theDestination instanceof TemporaryQueue) { - ((TemporaryQueue)theDestination).delete(); + if (replyDestination instanceof TemporaryQueue) { + ((TemporaryQueue)replyDestination).delete(); } if (theSession != null) { theSession.close(); } } + + public Queue getReplyDestination() { + return replyDestination; + } } Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=692329&r1=692328&r2=692329&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Thu Sep 4 20:38:07 2008 @@ -92,13 +92,13 @@ Exchange exchange = new ExchangeImpl(); exchange.setOneWay(isOneWay); message.setExchange(exchange); - exchange.setInMessage(message); + exchange.setOutMessage(message); try { conduit.prepare(message); } catch (IOException ex) { assertFalse("JMSConduit can't perpare to send out message", false); ex.printStackTrace(); - } + } OutputStream os = message.getContent(OutputStream.class); assertTrue("The OutputStream should not be null ", os != null); os.write("HelloWorld".getBytes()); Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=692329&r1=692328&r2=692329&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Thu Sep 4 20:38:07 2008 @@ -88,11 +88,8 @@ } public void verifySentMessage(boolean send, Message message) { - PooledSession pooledSession = (PooledSession)message.get(JMSConstants.JMS_POOLEDSESSION); OutputStream os = message.getContent(OutputStream.class); - assertTrue("pooled Session should not be null ", pooledSession != null); - assertTrue("OutputStream should not be null", os != null); - + assertTrue("OutputStream should not be null", os != null); } @Test @@ -140,11 +137,11 @@ JMSConduit conduit = setupJMSConduit(true, false); Message msg = new MessageImpl(); conduit.prepare(msg); - PooledSession sess = conduit.sessionFactory.get(true); + PooledSession sess = conduit.getOrCreateSessionFactory().get(); byte [] b = testMsg.getBytes(); - javax.jms.Message message = JMSUtils.marshal(b, + javax.jms.Message message = JMSUtils.createAndSetPayload(b, sess.session(), - null, JMSConstants.BYTE_MESSAGE_TYPE); + JMSConstants.BYTE_MESSAGE_TYPE); assertTrue("Message should have been of type BytesMessage ", message instanceof BytesMessage); Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=692329&r1=692328&r2=692329&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Thu Sep 4 20:38:07 2008 @@ -40,17 +40,17 @@ public class JMSDestinationTest extends AbstractJMSTester { private Message destMessage; - + @BeforeClass public static void createAndStartBroker() throws Exception { startBroker(new JMSBrokerSetup("tcp://localhost:61500")); } - - private void waitForReceiveInMessage() { + + private void waitForReceiveInMessage() { int waitTime = 0; while (inMessage == null && waitTime < 3000) { try { - Thread.sleep(1000); + Thread.sleep(1000); } catch (InterruptedException e) { // do nothing here } @@ -58,12 +58,12 @@ } assertTrue("Can't receive the Conduit Message in 3 seconds", inMessage != null); } - - private void waitForReceiveDestMessage() { + + private void waitForReceiveDestMessage() { int waitTime = 0; while (destMessage == null && waitTime < 3000) { try { - Thread.sleep(1000); + Thread.sleep(1000); } catch (InterruptedException e) { // do nothing here } @@ -71,9 +71,9 @@ } assertTrue("Can't receive the Destination message in 3 seconds", destMessage != null); } - - - + + + public JMSDestination setupJMSDestination(boolean send) throws IOException { ConduitInitiator conduitInitiator = EasyMock.createMock(ConduitInitiator.class); JMSDestination jmsDestination = new JMSDestination(bus, conduitInitiator, endpointInfo); @@ -84,15 +84,15 @@ Exchange exchange = new ExchangeImpl(); exchange.setInMessage(m); m.setExchange(exchange); - destMessage = m; + destMessage = m; } }; jmsDestination.setMessageObserver(observer); } return jmsDestination; } - - @Test + + @Test public void testGetConfigurationFromSpring() throws Exception { SpringBusFactory bf = new SpringBusFactory(); BusFactory.setDefaultBus(null); @@ -122,10 +122,10 @@ "cxf_message_selector", destination.getRuntimePolicy().getMessageSelector()); BusFactory.setDefaultBus(null); - + } - - @Test + + @Test public void testGetConfigurationFormWSDL() throws Exception { SpringBusFactory bf = new SpringBusFactory(); BusFactory.setDefaultBus(null); @@ -135,22 +135,22 @@ "/wsdl/jms_test.wsdl", "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort"); - + JMSDestination destination = setupJMSDestination(false); - + assertEquals("Can't get the right DurableSubscriberName", "CXF_subscriber", destination.getRuntimePolicy().getDurableSubscriberName()); - + assertEquals("Can't get the right AddressPolicy's ConnectionPassword", "dynamicQueues/test.jmstransport.binary", destination.getJMSAddress().getJndiDestinationName()); - + BusFactory.setDefaultBus(null); - + } - - @Test + + @Test public void testDurableSubscriber() throws Exception { SpringBusFactory bf = new SpringBusFactory(); BusFactory.setDefaultBus(null); @@ -158,51 +158,51 @@ BusFactory.setDefaultBus(bus); destMessage = null; inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldPubSubService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldPubSubService", "HelloWorldPubSubPort"); JMSConduit conduit = setupJMSConduit(true, false); Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); JMSDestination destination = null; try { - destination = setupJMSDestination(true); - //destination.activate(); + destination = setupJMSDestination(true); + destination.activate(); } catch (IOException e) { - assertFalse("The JMSDestination activate should not through exception ", false); + assertFalse("The JMSDestination activate should not through exception ", false); e.printStackTrace(); - } - sendoutMessage(conduit, outMessage, true); + } + sendoutMessage(conduit, outMessage, true); // wait for the message to be get from the destination - waitForReceiveDestMessage(); + waitForReceiveDestMessage(); // just verify the Destination inMessage assertTrue("The destiantion should have got the message ", destMessage != null); verifyReceivedMessage(destMessage); verifyHeaders(destMessage, outMessage); destination.shutdown(); } - - @Test + + @Test public void testOneWayDestination() throws Exception { destMessage = null; inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HWStaticReplyQBinMsgService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort"); JMSConduit conduit = setupJMSConduit(true, false); Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); JMSDestination destination = null; try { - destination = setupJMSDestination(true); - //destination.activate(); + destination = setupJMSDestination(true); + destination.activate(); } catch (IOException e) { - assertFalse("The JMSDestination activate should not through exception ", false); + assertFalse("The JMSDestination activate should not throw exception ", false); e.printStackTrace(); - } - sendoutMessage(conduit, outMessage, true); + } + sendoutMessage(conduit, outMessage, true); // wait for the message to be get from the destination waitForReceiveDestMessage(); // just verify the Destination inMessage @@ -211,10 +211,10 @@ verifyHeaders(destMessage, outMessage); destination.shutdown(); } - + private void setupMessageHeader(Message outMessage) { JMSMessageHeadersType header = new JMSMessageHeadersType(); - header.setJMSCorrelationID("Destination test"); + header.setJMSCorrelationID("Destination test"); header.setJMSDeliveryMode(3); header.setJMSPriority(1); header.setTimeToLive(1000); @@ -222,7 +222,7 @@ } private void verifyReceivedMessage(Message inMessage) { - ByteArrayInputStream bis = + ByteArrayInputStream bis = (ByteArrayInputStream) inMessage.getContent(InputStream.class); byte bytes[] = new byte[bis.available()]; try { @@ -234,67 +234,67 @@ String reponse = IOUtils.newStringFromBytes(bytes); assertEquals("The reponse date should be equals", reponse, "HelloWorld"); } - + private void verifyRequestResponseHeaders(Message inMessage, Message outMessage) { JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); - + JMSMessageHeadersType inHeader = - (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); - + (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); + verifyJmsHeaderEquality(outHeader, inHeader); - + } - + private void verifyHeaders(Message inMessage, Message outMessage) { JMSMessageHeadersType outHeader = (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); - + JMSMessageHeadersType inHeader = - (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); - + (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); + verifyJmsHeaderEquality(outHeader, inHeader); - + } private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) { - assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals", + assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals", outHeader.getJMSCorrelationID(), inHeader.getJMSCorrelationID()); - assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", + assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader.getJMSPriority(), inHeader.getJMSPriority()); - assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", + assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader.getJMSType(), inHeader.getJMSType()); - + } - - - - @Test + + + + @Test public void testRoundTripDestination() throws Exception { - + inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldService", "HelloWorldPort"); - //set up the conduit send to be true + //set up the conduit send to be true JMSConduit conduit = setupJMSConduit(true, false); final Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); final JMSDestination destination = setupJMSDestination(true); - - //set up MessageObserver for handlering the conduit message + + //set up MessageObserver for handling the conduit message MessageObserver observer = new MessageObserver() { - public void onMessage(Message m) { + public void onMessage(Message m) { Exchange exchange = new ExchangeImpl(); exchange.setInMessage(m); m.setExchange(exchange); verifyReceivedMessage(m); verifyHeaders(m, outMessage); - //setup the message for + //setup the message for Conduit backConduit; try { - backConduit = destination.getBackChannel(m, null, null); + backConduit = destination.getBackChannel(m, null, null); //wait for the message to be got from the conduit Message replyMessage = new MessageImpl(); sendoutMessage(backConduit, replyMessage, true); @@ -304,59 +304,65 @@ } } }; - destination.setMessageObserver(observer); + destination.setMessageObserver(observer); //set is oneway false for get response from destination - sendoutMessage(conduit, outMessage, false); - //wait for the message to be got from the destination, - // create the thread to handler the Destination incomming message - + sendoutMessage(conduit, outMessage, false); + //wait for the message to be got from the destination, + // create the thread to handler the Destination incoming message + waitForReceiveInMessage(); verifyReceivedMessage(inMessage); // wait for a while for the jms session recycling + + // Send a second message to check for an issue + // Where the session was closed the second time + sendoutMessage(conduit, outMessage, false); + waitForReceiveInMessage(); + verifyReceivedMessage(inMessage); + Thread.sleep(1000); destination.shutdown(); } - - @Test + @Test public void testPropertyExclusion() throws Exception { - - final String customPropertyName = + + final String customPropertyName = "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED"; inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldService", "HelloWorldPort"); - //set up the conduit send to be true + //set up the conduit send to be true JMSConduit conduit = setupJMSConduit(true, false); final Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); - + JMSPropertyType excludeProp = new JMSPropertyType(); excludeProp.setName(customPropertyName); excludeProp.setValue(customPropertyName); - + JMSMessageHeadersType headers = (JMSMessageHeadersType) outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); headers.getProperty().add(excludeProp); - + final JMSDestination destination = setupJMSDestination(true); - - //set up MessageObserver for handlering the conduit message + + //set up MessageObserver for handling the conduit message MessageObserver observer = new MessageObserver() { - public void onMessage(Message m) { + public void onMessage(Message m) { Exchange exchange = new ExchangeImpl(); exchange.setInMessage(m); m.setExchange(exchange); verifyReceivedMessage(m); verifyHeaders(m, outMessage); - //setup the message for + //setup the message for Conduit backConduit; try { - backConduit = destination.getBackChannel(m, null, null); + backConduit = destination.getBackChannel(m, null, null); //wait for the message to be got from the conduit Message replyMessage = new MessageImpl(); sendoutMessage(backConduit, replyMessage, true); @@ -366,18 +372,18 @@ } } }; - destination.setMessageObserver(observer); + destination.setMessageObserver(observer); //set is oneway false for get response from destination - sendoutMessage(conduit, outMessage, false); - //wait for the message to be got from the destination, - // create the thread to handler the Destination incomming message - + sendoutMessage(conduit, outMessage, false); + //wait for the message to be got from the destination, + // create the thread to handler the Destination incoming message + waitForReceiveInMessage(); verifyReceivedMessage(inMessage); - - + + verifyRequestResponseHeaders(inMessage, outMessage); - + JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); @@ -389,13 +395,13 @@ Thread.sleep(1000); destination.shutdown(); } - + @Test public void testIsMultiplexCapable() throws Exception { inMessage = null; - setupServiceInfo("http://cxf.apache.org/hello_world_jms", - "/wsdl/jms_test.wsdl", - "HelloWorldService", + setupServiceInfo("http://cxf.apache.org/hello_world_jms", + "/wsdl/jms_test.wsdl", + "HelloWorldService", "HelloWorldPort"); final JMSDestination destination = setupJMSDestination(true); assertTrue("is multiplex", destination instanceof MultiplexDestination); Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java?rev=692329&r1=692328&r2=692329&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledSessionTest.java Thu Sep 4 20:38:07 2008 @@ -19,12 +19,6 @@ package org.apache.cxf.transport.jms; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.easymock.classextension.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -33,29 +27,6 @@ @Test public void testPooledSession() throws Exception { - Session sess = EasyMock.createMock(Session.class); - Destination dest = EasyMock.createMock(Destination.class); - MessageProducer mproducer = EasyMock.createMock(MessageProducer.class); - MessageConsumer mconsumer = EasyMock.createMock(MessageConsumer.class); - - PooledSession ps = new PooledSession(sess, dest, mproducer, mconsumer); - - assertTrue(ps.session().equals(sess)); - assertTrue(ps.destination().equals(dest)); - assertTrue(ps.consumer().equals(mconsumer)); - assertTrue(ps.producer().equals(mproducer)); - - MessageConsumer mcons = EasyMock.createMock(MessageConsumer.class); - assertFalse(mconsumer.equals(mcons)); - - ps.consumer(mcons); - - assertTrue(ps.consumer().equals(mcons)); - - Destination mdest = EasyMock.createMock(Destination.class); - assertFalse(dest.equals(mdest)); - - ps.destination(mdest); - assertTrue(mdest.equals(ps.destination())); + // TODO This has to be rewritten as PooledSession now works differently } }
