This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new ea479e1 ARTEMIS-2311 Add additional tests for reply to cross protocol
new b66adc2 This closes #2644
ea479e1 is described below
commit ea479e1f54db4f0a672fe4a9e74cc3972c512c35
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Apr 25 12:28:31 2019 -0400
ARTEMIS-2311 Add additional tests for reply to cross protocol
Add a broader range of tests for AMQP message reply to handling
across protocols to help spot potential future regressions.
---
.../crossprotocol/RequestReplyNonJMSTest.java | 325 ++++++++++++++++++++-
1 file changed, 309 insertions(+), 16 deletions(-)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
index 7c7852c..d15ecd8 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java
@@ -14,16 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.artemis.tests.integration.crossprotocol;
+import static
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.UUID;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
-import java.net.URI;
-import java.util.Arrays;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -42,17 +48,17 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static
org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
-
@RunWith(Parameterized.class)
public class RequestReplyNonJMSTest extends OpenWireTestBase {
- String protocolConsumer;
- ConnectionFactory consumerCF;
private static final SimpleString queueName =
SimpleString.toSimpleString("RequestReplyQueueTest");
private static final SimpleString topicName =
SimpleString.toSimpleString("RequestReplyTopicTest");
private static final SimpleString replyQueue =
SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest");
+ private final String protocolConsumer;
+
+ private ConnectionFactory consumerCF;
+
public RequestReplyNonJMSTest(String protocolConsumer) {
this.protocolConsumer = protocolConsumer;
}
@@ -66,8 +72,6 @@ public class RequestReplyNonJMSTest extends OpenWireTestBase {
});
}
-
-
@Before
public void setupCF() {
consumerCF = createConnectionFactory(protocolConsumer, urlString);
@@ -83,15 +87,64 @@ public class RequestReplyNonJMSTest extends
OpenWireTestBase {
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
}
-
@Test
- public void testReplyToSourceAMQP() throws Throwable {
+ public void testReplyToFromAMQPClientWithInvalidTypeAnnotation() throws
Throwable {
+ AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ AmqpSender sender = null;
+ Connection consumerConn = null;
+
+ try {
+ connection = directClient.connect(true);
+ session = connection.createSession();
+ sender = session.createSender(queueName.toString());
+
+ AmqpMessage message = new AmqpMessage();
+ message = new AmqpMessage();
+ message.setReplyToAddress(replyQueue.toString());
+ message.setMessageAnnotation("x-opt-jms-reply-to", new
Byte((byte)10)); // that's invalid on the conversion, lets hope it doesn't fail
+ message.setMessageId("msg-1");
+ sender.send(message);
+
+ consumerConn = consumerCF.createConnection();
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSess.createQueue(queueName.toString());
+ Queue replyQueue =
consumerSess.createQueue(RequestReplyNonJMSTest.replyQueue.toString());
+
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ consumerConn.start();
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ Assert.assertNotNull(receivedMessage);
+ Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+ Assert.assertTrue(receivedMessage.getJMSReplyTo() instanceof
javax.jms.Queue);
+
+ Assert.assertNull(consumer.receiveNoWait());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable dontcare) {
+ dontcare.printStackTrace();
+ }
+ }
+ }
+ @Test
+ public void testReplyToFromAMQPClientWithNoTypeOrOtherAnnotations() throws
Throwable {
AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
AmqpConnection connection = null;
AmqpSession session = null;
AmqpSender sender = null;
Connection consumerConn = null;
+
try {
connection = directClient.connect(true);
session = connection.createSession();
@@ -103,10 +156,54 @@ public class RequestReplyNonJMSTest extends
OpenWireTestBase {
message.setText("Test-Message");
sender.send(message);
- message = new AmqpMessage();
+ consumerConn = consumerCF.createConnection();
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSess.createQueue(queueName.toString());
+ Queue replyQueue =
consumerSess.createQueue(RequestReplyNonJMSTest.replyQueue.toString());
+
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ consumerConn.start();
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ Assert.assertNotNull(receivedMessage);
+ Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+ Assert.assertTrue(receivedMessage.getJMSReplyTo() instanceof
javax.jms.Queue);
+
+ Assert.assertNull(consumer.receiveNoWait());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable dontcare) {
+ dontcare.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void testReplyToFromAMQPClientWithNoTypeButWithOtherAnnotations()
throws Throwable {
+ AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ AmqpSender sender = null;
+ Connection consumerConn = null;
+
+ try {
+ connection = directClient.connect(true);
+ session = connection.createSession();
+ sender = session.createSender(queueName.toString());
+
+ AmqpMessage message = new AmqpMessage();
message.setReplyToAddress(replyQueue.toString());
- message.setMessageAnnotation("x-opt-jms-reply-to", new
Byte((byte)10)); // that's invalid on the conversion, lets hope it doesn't fail
- message.setMessageId("msg-2");
+ message.setMessageId("msg-1");
+ message.setMessageAnnotation("x-opt-not-jms-reply-to", new
Byte((byte)1));
+ message.setText("Test-Message");
sender.send(message);
consumerConn = consumerCF.createConnection();
@@ -119,10 +216,57 @@ public class RequestReplyNonJMSTest extends
OpenWireTestBase {
javax.jms.Message receivedMessage = consumer.receive(5000);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+ Assert.assertTrue(receivedMessage.getJMSReplyTo() instanceof
javax.jms.Queue);
+
+ Assert.assertNull(consumer.receiveNoWait());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable dontcare) {
+ dontcare.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void testReplyToFromAMQPClientWithQueueReplyToAddress() throws
Throwable {
+ AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ AmqpSender sender = null;
+ Connection consumerConn = null;
+
+ try {
+ connection = directClient.connect(true);
+ session = connection.createSession();
+ sender = session.createSender(queueName.toString());
+
+ AmqpMessage message = new AmqpMessage();
+ message.setReplyToAddress(replyQueue.toString());
+ message.setMessageId("msg-1");
+ message.setMessageAnnotation("x-opt-jms-reply-to", new Byte((byte)0));
+ message.setText("Test-Message");
+ sender.send(message);
- receivedMessage = consumer.receive(5000);
+ consumerConn = consumerCF.createConnection();
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSess.createQueue(queueName.toString());
+ Queue replyQueue =
consumerSess.createQueue(RequestReplyNonJMSTest.replyQueue.toString());
+
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ consumerConn.start();
+ javax.jms.Message receivedMessage = consumer.receive(5000);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
+ Assert.assertTrue(receivedMessage.getJMSReplyTo() instanceof
javax.jms.Queue);
Assert.assertNull(consumer.receiveNoWait());
} catch (Throwable e) {
@@ -142,6 +286,155 @@ public class RequestReplyNonJMSTest extends
OpenWireTestBase {
}
}
-}
+ @Test
+ public void testReplyToFromAMQPClientWithTopicReplyToAddress() throws
Throwable {
+ AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ AmqpSender sender = null;
+ Connection consumerConn = null;
+
+ try {
+ connection = directClient.connect(true);
+ session = connection.createSession();
+ sender = session.createSender(queueName.toString());
+
+ AmqpMessage message = new AmqpMessage();
+ message.setReplyToAddress(topicName.toString());
+ message.setMessageId("msg-1");
+ message.setMessageAnnotation("x-opt-jms-reply-to", new Byte((byte)1));
+ message.setText("Test-Message");
+ sender.send(message);
+
+ consumerConn = consumerCF.createConnection();
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSess.createQueue(queueName.toString());
+ Topic replyTopic =
consumerSess.createTopic(RequestReplyNonJMSTest.topicName.toString());
+
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ consumerConn.start();
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ Assert.assertNotNull(receivedMessage);
+ Assert.assertEquals(replyTopic, receivedMessage.getJMSReplyTo());
+ Assert.assertTrue(receivedMessage.getJMSReplyTo() instanceof
javax.jms.Topic);
+ Assert.assertNull(consumer.receiveNoWait());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable dontcare) {
+ dontcare.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void testReplyToFromAMQPClientWithTempTopicReplyToAddress() throws
Throwable {
+ AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ AmqpSender sender = null;
+ Connection consumerConn = null;
+
+ try {
+ connection = directClient.connect(true);
+ session = connection.createSession();
+ sender = session.createSender(queueName.toString());
+
+ String replyToName = UUID.randomUUID().toString();
+ AmqpMessage message = new AmqpMessage();
+ message.setReplyToAddress(replyToName);
+ message.setMessageId("msg-1");
+ message.setMessageAnnotation("x-opt-jms-reply-to", new Byte((byte)3));
+ message.setText("Test-Message");
+ sender.send(message);
+
+ consumerConn = consumerCF.createConnection();
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSess.createQueue(queueName.toString());
+
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ consumerConn.start();
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ Assert.assertNotNull(receivedMessage);
+ Assert.assertTrue(receivedMessage.getJMSReplyTo() instanceof
javax.jms.TemporaryTopic);
+ Assert.assertEquals(replyToName, ((TemporaryTopic)
receivedMessage.getJMSReplyTo()).getTopicName());
+
+ Assert.assertNull(consumer.receiveNoWait());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable dontcare) {
+ dontcare.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void testReplyToFromAMQPClientWithTempQueueReplyToAddress() throws
Throwable {
+ AmqpClient directClient = new AmqpClient(new
URI("tcp://localhost:61616"), null, null);
+ AmqpConnection connection = null;
+ AmqpSession session = null;
+ AmqpSender sender = null;
+ Connection consumerConn = null;
+
+ try {
+ connection = directClient.connect(true);
+ session = connection.createSession();
+ sender = session.createSender(queueName.toString());
+
+ String replyToName = UUID.randomUUID().toString();
+
+ AmqpMessage message = new AmqpMessage();
+ message.setReplyToAddress(replyToName);
+ message.setMessageId("msg-1");
+ message.setMessageAnnotation("x-opt-jms-reply-to", new Byte((byte)2));
+ message.setText("Test-Message");
+ sender.send(message);
+
+ consumerConn = consumerCF.createConnection();
+ Session consumerSess = consumerConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = consumerSess.createQueue(queueName.toString());
+
+ MessageConsumer consumer = consumerSess.createConsumer(queue);
+ consumerConn.start();
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ Assert.assertNotNull(receivedMessage);
+ Assert.assertTrue(receivedMessage.getJMSReplyTo() instanceof
javax.jms.TemporaryQueue);
+ Assert.assertEquals(replyToName, ((TemporaryQueue)
receivedMessage.getJMSReplyTo()).getQueueName());
+
+ Assert.assertNull(consumer.receiveNoWait());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable dontcare) {
+ dontcare.printStackTrace();
+ }
+ }
+ }
+}