Repository: activemq Updated Branches: refs/heads/master a095e9b9d -> 11da37b99
https://issues.apache.org/jira/browse/AMQ-5828 Fixed bad default for message persistence that breaks AMQP specification defined behavior when the durable value is not present in the Header. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11da37b9 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11da37b9 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11da37b9 Branch: refs/heads/master Commit: 11da37b991d9ff6e7419a0ee0c8929a7a8dabced Parents: a095e9b Author: Timothy Bish <[email protected]> Authored: Fri Jun 5 11:21:27 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Jun 5 11:21:27 2015 -0400 ---------------------------------------------------------------------- .../amqp/message/AMQPRawInboundTransformer.java | 5 ++- .../amqp/message/InboundTransformer.java | 2 +- .../transport/amqp/AmqpTransformerTest.java | 10 ++--- .../transport/amqp/client/AmqpMessage.java | 33 ++++++++++++++- .../amqp/interop/AmqpSendReceiveTest.java | 43 ++++++++++++++++++++ 5 files changed, 85 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java index d60a96b..e1414df 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.message; import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; import javax.jms.Message; public class AMQPRawInboundTransformer extends InboundTransformer { @@ -40,7 +41,9 @@ public class AMQPRawInboundTransformer extends InboundTransformer { BytesMessage rc = vendor.createBytesMessage(); rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); - rc.setJMSDeliveryMode(defaultDeliveryMode); + // We cannot decode the message headers to check so err on the side of caution + // and mark all messages as persistent. + rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT); rc.setJMSPriority(defaultPriority); final long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index 1310bcd..26d5753 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -51,7 +51,7 @@ public abstract class InboundTransformer { String prefixMessageAnnotations = "MA_"; String prefixFooter = "FT_"; - int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; + int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT; int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java index 0c2c6f7..b513c1a 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java @@ -25,6 +25,7 @@ import java.net.URI; import javax.jms.BytesMessage; import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -89,7 +90,7 @@ public class AmqpTransformerTest { Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); assertEquals(0L, messageFormat.longValue()); assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); - assertEquals(2, message.getJMSDeliveryMode()); + assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); assertEquals(7, message.getJMSPriority()); c.close(); @@ -138,10 +139,9 @@ public class AmqpTransformerTest { Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); assertEquals(0L, messageFormat.longValue()); assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); - assertEquals(2, message.getJMSDeliveryMode()); + assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); - // should not equal 7 (should equal the default) because "raw" does not map - // headers + // should not equal 7 (should equal the default) because "raw" does not map headers assertEquals(4, message.getJMSPriority()); c.close(); @@ -187,7 +187,7 @@ public class AmqpTransformerTest { Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); assertEquals(0L, messageFormat.longValue()); assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed); - assertEquals(2, message.getJMSDeliveryMode()); + assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); c.close(); session.close(); http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index d29d620..0acd1c6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.engine.Delivery; @@ -53,7 +54,6 @@ public class AmqpMessage { delivery = null; message = Proton.message(); - message.setDurable(true); } /** @@ -247,6 +247,32 @@ public class AmqpMessage { } /** + * Sets the durable header on the outgoing message. + * + * @param durable + * the boolean durable value to set. + */ + public void setDurable(boolean durable) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setDurable(durable); + } + + /** + * Checks the durable value in the Message Headers to determine if + * the message was sent as a durable Message. + * + * @return true if the message is marked as being durable. + */ + public boolean isDurable() { + if (message.getHeader() == null) { + return false; + } + + return message.getHeader().getDurable(); + } + + /** * Sets a given application property on an outbound message. * * @param key @@ -448,6 +474,11 @@ public class AmqpMessage { } } + private void lazyCreateHeader() { + if (message.getHeader() == null) { + message.setHeader(new Header()); + } + } private void lazyCreateProperties() { if (message.getProperties() == null) { message.setProperties(new Properties()); http://git-wip-us.apache.org/repos/asf/activemq/blob/11da37b9/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java index 822edee..29ff954 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java @@ -17,7 +17,9 @@ package org.apache.activemq.transport.amqp.interop; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; @@ -154,4 +156,45 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver1.close(); receiver2.close(); } + + @Test(timeout = 60000) + public void testMessageDurabliltyFollowsSpec() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + + QueueViewMBean queue = getProxyToQueue(getTestName()); + + // Create default message that should be sent as non-durable + AmqpMessage message1 = new AmqpMessage(); + message1.setText("Test-Message -> non-durable"); + message1.setDurable(false); + message1.setMessageId("ID:Message:1"); + sender.send(message1); + + assertEquals(1, queue.getQueueSize()); + receiver1.flow(1); + message1 = receiver1.receive(50, TimeUnit.SECONDS); + assertFalse("First message sent should not be durable", message1.isDurable()); + message1.accept(); + + // Create default message that should be sent as non-durable + AmqpMessage message2 = new AmqpMessage(); + message2.setText("Test-Message -> durable"); + message2.setDurable(true); + message2.setMessageId("ID:Message:2"); + sender.send(message2); + + assertEquals(1, queue.getQueueSize()); + receiver1.flow(1); + message2 = receiver1.receive(50, TimeUnit.SECONDS); + assertTrue("Second message sent should be durable", message2.isDurable()); + message2.accept(); + + sender.close(); + connection.close(); + } }
