Repository: activemq Updated Branches: refs/heads/master e333fd957 -> 351d4b9de
https://issues.apache.org/jira/browse/AMQ-5666 Add some additional tests to validate AMQP behavior Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/351d4b9d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/351d4b9d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/351d4b9d Branch: refs/heads/master Commit: 351d4b9dea128a1faedd55fab448931f374922eb Parents: e333fd9 Author: Timothy Bish <[email protected]> Authored: Mon Mar 30 17:20:52 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Mar 30 17:20:52 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/client/AmqpMessage.java | 26 +++++++ .../amqp/interop/AmqpSendReceiveTest.java | 78 ++++++++++++++++++++ 2 files changed, 104 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/351d4b9d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 9db12f9..e5d2d97 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -211,6 +211,32 @@ public class AmqpMessage { } /** + * Sets the GroupId property on an outbound message using the provided String + * + * @param messageId + * the String Group ID value to set. + */ + public void setGroupId(String groupId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setGroupId(groupId); + } + + /** + * Return the set GroupId value in String form, if there are no properties + * in the given message return null. + * + * @return the set GroupID in String form or null if not set. + */ + public String getGroupId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getGroupId(); + } + + /** * Sets a given application property on an outbound message. * * @param key http://git-wip-us.apache.org/repos/asf/activemq/blob/351d4b9d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index e7058e5..822edee 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.interop; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.util.concurrent.TimeUnit; @@ -76,4 +77,81 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver2.close(); connection.close(); } + + @Test(timeout = 60000) + public void testReceiveWithJMSSelectorFilter() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpMessage message = new AmqpMessage(); + + message.setGroupId("abcdefg"); + message.setApplicationProperty("sn", 100); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + sender.send(message); + sender.close(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), "sn = 100"); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertEquals(100, received.getApplicationProperty("sn")); + assertEquals("abcdefg", received.getGroupId()); + received.accept(); + + receiver.close(); + } + + @Test(timeout = 30000) + public void testAdvancedLinkFlowControl() throws Exception { + final int MSG_COUNT = 20; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + i); + message.setMessageAnnotation("serialNo", i); + message.setText("Test-Message"); + + sender.send(message); + } + + sender.close(); + + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + receiver1.flow(2); + AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS); + AmqpMessage message2 = receiver1.receive(5, TimeUnit.SECONDS); + assertEquals("msg0", message1.getMessageId()); + assertEquals("msg1", message2.getMessageId()); + message1.accept(); + message2.accept(); + + AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); + receiver2.flow(2); + AmqpMessage message3 = receiver2.receive(5, TimeUnit.SECONDS); + AmqpMessage message4 = receiver2.receive(5, TimeUnit.SECONDS); + assertEquals("msg2", message3.getMessageId()); + assertEquals("msg3", message4.getMessageId()); + message3.accept(); + message4.accept(); + + receiver1.flow(MSG_COUNT - 4); + for (int i = 4; i < MSG_COUNT - 4; i++) { + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertEquals("msg" + i, message.getMessageId()); + message.accept(); + } + + receiver1.close(); + receiver2.close(); + } }
