ARTEMIS-1100 Store Header on AMQP message
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/01362bbb Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/01362bbb Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/01362bbb Branch: refs/heads/master Commit: 01362bbb1d423c3859442664508533bdf1755772 Parents: 7304416 Author: Clebert Suconic <[email protected]> Authored: Fri Apr 7 09:13:43 2017 -0400 Committer: Justin Bertram <[email protected]> Committed: Fri Apr 7 09:11:11 2017 -0500 ---------------------------------------------------------------------- .../protocol/amqp/broker/AMQPMessage.java | 4 +- .../amqp/AmqpMessagePriorityTest.java | 42 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/01362bbb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index cc0d1d8..ffc3783 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1013,14 +1013,14 @@ public class AMQPMessage extends RefCountMessage { } private int internalPersistSize() { - return data.array().length - sendFrom; + return data.array().length; } @Override public void persist(ActiveMQBuffer targetRecord) { checkBuffer(); targetRecord.writeInt(internalPersistSize()); - targetRecord.writeBytes(data.array(), sendFrom, data.array().length - sendFrom); + targetRecord.writeBytes(data.array(), 0, data.array().length ); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/01362bbb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java index 39f6eac..d1467b1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java @@ -67,6 +67,48 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testRestartServer() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + message.setMessageId("MessageID:1"); + message.setPriority((short) 7); + + + sender.send(message); + sender.close(); + connection.close(); + + server.stop(); + server.start(); + + client = createAmqpClient(); + connection = addConnection(client.connect()); + session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 7, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + @Test(timeout = 60000) public void testMessageNonDefaultPriority() throws Exception {
