Repository: activemq Updated Branches: refs/heads/trunk 37eb6b0c6 -> ddf0b2a30
Adds a basic request / response test using temp topic and temp queue for reply to destination. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ddf0b2a3 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ddf0b2a3 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ddf0b2a3 Branch: refs/heads/trunk Commit: ddf0b2a309125ebf5ff3e3a517ef50831810fab1 Parents: 37eb6b0 Author: Timothy Bish <[email protected]> Authored: Fri Mar 7 18:07:58 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Mar 7 18:07:58 2014 -0500 ---------------------------------------------------------------------- .../amqp/JmsClientRequestResponseTest.java | 223 +++++++++++++++++++ 1 file changed, 223 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ddf0b2a3/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java new file mode 100644 index 0000000..9f7b393 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java @@ -0,0 +1,223 @@ +package org.apache.activemq.transport.amqp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; +import java.util.Vector; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsClientRequestResponseTest extends AmqpTestSupport implements MessageListener { + + private static final Logger LOG = LoggerFactory.getLogger(JmsClientRequestResponseTest.class); + + @Rule public TestName name = new TestName(); + + private Connection requestorConnection; + private Destination requestDestination; + private Session requestorSession; + + private Connection responderConnection; + private MessageProducer responseProducer; + private Session responderSession; + private Destination replyDestination; + + private final List<JMSException> failures = new Vector<JMSException>(); + private boolean dynamicallyCreateProducer; + private final boolean useAsyncConsumer = true; + private Thread syncThread; + + @Override + @After + public void tearDown() throws Exception { + requestorConnection.close(); + responderConnection.close(); + + if (syncThread != null) { + syncThread.join(5000); + } + + super.tearDown(); + } + + private void doSetupConnections(boolean topic) throws Exception { + responderConnection = createConnection(name.getMethodName() + "-responder"); + responderSession = responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + if (topic) { + requestDestination = responderSession.createTopic(name.getMethodName()); + } else { + requestDestination = responderSession.createQueue(name.getMethodName()); + } + responseProducer = responderSession.createProducer(null); + + final MessageConsumer requestConsumer = responderSession.createConsumer(requestDestination); + if (useAsyncConsumer) { + requestConsumer.setMessageListener(this); + } else { + syncThread = new Thread(new Runnable() { + @Override + public void run() { + syncConsumeLoop(requestConsumer); + } + }); + syncThread.start(); + } + responderConnection.start(); + + requestorConnection = createConnection(name.getMethodName() + "-requestor"); + requestorSession = requestorConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + if (topic) { + replyDestination = requestorSession.createTemporaryTopic(); + } else { + replyDestination = requestorSession.createTemporaryQueue(); + } + requestorConnection.start(); + } + + @Test(timeout=60000) + public void testRequestResponseToTempQueue() throws Exception { + doSetupConnections(false); + doTestRequestResponse(); + } + + @Test(timeout=60000) + public void testRequestResponseToTempTopic() throws Exception { + doSetupConnections(true); + doTestRequestResponse(); + } + + private void doTestRequestResponse() throws Exception { + + MessageProducer requestProducer = requestorSession.createProducer(requestDestination); + MessageConsumer replyConsumer = requestorSession.createConsumer(replyDestination); + + TextMessage requestMessage = requestorSession.createTextMessage("SomeRequest"); + requestMessage.setJMSReplyTo(replyDestination); + requestProducer.send(requestMessage); + + LOG.info("Sent request to destination: {}", requestDestination.toString()); + + Message msg = replyConsumer.receive(10000); + + if (msg instanceof TextMessage) { + TextMessage replyMessage = (TextMessage)msg; + LOG.info("Received reply."); + LOG.info(replyMessage.toString()); + assertTrue("Wrong message content", replyMessage.getText().startsWith("response")); + } else { + fail("Should have received a reply by now"); + } + replyConsumer.close(); + + assertEquals("Should not have had any failures: " + failures, 0, failures.size()); + } + + /** + * Can be overridden in subclasses to test against a different transport suchs as NIO. + * + * @return the port to connect to on the Broker. + */ + protected int getBrokerPort() { + return port; + } + + private Connection createConnection(String clientId) throws JMSException { + return createConnection(clientId, false, false); + } + + protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException { + + int brokerPort = getBrokerPort(); + LOG.debug("Creating connection on port {}", brokerPort); + final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl); + + factory.setSyncPublish(syncPublish); + factory.setTopicPrefix("topic://"); + factory.setQueuePrefix("queue://"); + + final Connection connection = factory.createConnection(); + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + connection.start(); + return connection; + } + + protected void syncConsumeLoop(MessageConsumer requestConsumer) { + try { + Message message = requestConsumer.receive(5000); + if (message != null) { + onMessage(message); + } else { + LOG.error("No message received"); + } + } catch (JMSException e) { + onException(e); + } + } + + @Override + public void onMessage(Message message) { + try { + TextMessage requestMessage = (TextMessage)message; + + LOG.info("Received request."); + LOG.info(requestMessage.toString()); + + Destination replyDestination = requestMessage.getJMSReplyTo(); + if (replyDestination instanceof Topic) { + LOG.info("Reply destination is: {}", ((Topic)replyDestination).getTopicName()); + } else { + LOG.info("Reply destination is: {}", ((Queue)replyDestination).getQueueName()); + } + + TextMessage replyMessage = responderSession.createTextMessage("response for: " + requestMessage.getText()); + replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); + + if (dynamicallyCreateProducer) { + responseProducer = responderSession.createProducer(replyDestination); + responseProducer.send(replyMessage); + } else { + responseProducer.send(replyDestination, replyMessage); + } + + LOG.info("Sent reply."); + LOG.info(replyMessage.toString()); + } catch (JMSException e) { + onException(e); + } + } + + protected void onException(JMSException e) { + LOG.info("Caught: " + e); + e.printStackTrace(); + failures.add(e); + } +}
