Author: kwall
Date: Tue Feb 7 11:31:14 2012
New Revision: 1241431
URL: http://svn.apache.org/viewvc?rev=1241431&view=rev
Log:
QPID-3818: Replace CombinedTest and its support classes with a simpler
test-case.
Added:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
Removed:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Client.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/CombinedTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/Service.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/ServiceCreator.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
Added:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java?rev=1241431&view=auto
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
(added)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSReplyToTest.java
Tue Feb 7 11:31:14 2012
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.client.message;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Tests that {@link Message#setJMSReplyTo(Destination)} can be used to pass a
{@link Destination} between
+ * messaging clients as is commonly used in request/response messaging pattern
implementations.
+ */
+public class JMSReplyToTest extends QpidBrokerTestCase
+{
+ private AtomicReference<Throwable> _caughtException = new
AtomicReference<Throwable>();
+ private Queue _requestQueue;
+ private Connection _connection;
+ private Session _session;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _requestQueue = startAsyncRespondingJmsConsumerOnSeparateConnection();
+
+ _connection = getConnection();
+ _connection.start();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testRequestResponseUsingJmsReplyTo() throws Exception
+ {
+ final String responseQueueName = getTestQueueName() + ".response";
+ Queue replyToQueue = _session.createQueue(responseQueueName);
+ sendRequestAndValidateResponse(replyToQueue);
+ }
+
+ public void testRequestResponseUsingTemporaryJmsReplyTo() throws Exception
+ {
+ TemporaryQueue replyToQueue = _session.createTemporaryQueue();
+
+ sendRequestAndValidateResponse(replyToQueue);
+ }
+
+ private void sendRequestAndValidateResponse(Queue replyToQueue) throws
JMSException, Exception
+ {
+ MessageConsumer replyConsumer = _session.createConsumer(replyToQueue);
+
+ Message requestMessage = createRequestMessageWithJmsReplyTo(_session,
replyToQueue);
+ sendRequest(_requestQueue, _session, requestMessage);
+
+ receiveAndValidateResponse(replyConsumer, requestMessage);
+
+ assertNull("Async responder caught unexpected exception",
_caughtException.get());
+ }
+
+ private Message createRequestMessageWithJmsReplyTo(Session session, Queue
replyToQueue)
+ throws JMSException
+ {
+ Message requestMessage = session.createTextMessage("My request");
+ requestMessage.setJMSReplyTo(replyToQueue);
+ return requestMessage;
+ }
+
+ private void sendRequest(final Queue requestQueue, Session session,
Message requestMessage) throws Exception
+ {
+ MessageProducer producer = session.createProducer(requestQueue);
+ producer.send(requestMessage);
+ }
+
+ private void receiveAndValidateResponse(MessageConsumer replyConsumer,
Message requestMessage) throws JMSException
+ {
+ Message responseMessage = replyConsumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Response message not received", responseMessage);
+ assertEquals("Correlation id of the response should match message id
of the request",
+ responseMessage.getJMSCorrelationID(),
requestMessage.getJMSMessageID());
+ }
+
+ private Queue startAsyncRespondingJmsConsumerOnSeparateConnection() throws
Exception
+ {
+ final String requestQueueName = getTestQueueName() + ".request";
+ final Connection responderConnection = getConnection();
+ responderConnection.start();
+ final Session responderSession =
responderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue requestQueue =
responderSession.createQueue(requestQueueName);
+
+ final MessageConsumer requestConsumer =
responderSession.createConsumer(requestQueue);
+ requestConsumer.setMessageListener(new
AsyncResponder(responderSession));
+
+ return requestQueue;
+ }
+
+ private final class AsyncResponder implements MessageListener
+ {
+ private final Session _responderSession;
+
+ private AsyncResponder(Session responderSession)
+ {
+ _responderSession = responderSession;
+ }
+
+ @Override
+ public void onMessage(Message requestMessage)
+ {
+ try
+ {
+ Destination replyTo = getReplyToQueue(requestMessage);
+
+ Message responseMessage = _responderSession.createMessage();
+
responseMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
+
+ sendResponseToQueue(replyTo, responseMessage);
+ }
+ catch (Throwable t)
+ {
+ _caughtException.set(t);
+ }
+ }
+
+ private Destination getReplyToQueue(Message requestMessage) throws
JMSException, IllegalStateException
+ {
+ Destination replyTo = requestMessage.getJMSReplyTo();
+ if (replyTo == null)
+ {
+ throw new IllegalStateException("JMSReplyTo was null on
message " + requestMessage);
+ }
+ return replyTo;
+ }
+
+ private void sendResponseToQueue(Destination replyTo, Message
responseMessage)
+ throws JMSException
+ {
+ MessageProducer responseProducer =
_responderSession.createProducer(replyTo);
+ responseProducer.send(responseMessage);
+ }
+ }
+
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]