Repository: activemq-artemis Updated Branches: refs/heads/master 4870500ea -> aca2ae25f
ARTEMIS-1550 - Support LVQ for OpenWire Add support for LVQ, using the same property key as core "_AMQ_LVQ_NAME" Update current LVQ test case to correctly test OpenWire LVQ. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fa805366 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fa805366 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fa805366 Branch: refs/heads/master Commit: fa805366153157fcb7bb877e2377b92a418190f2 Parents: 4870500 Author: Michael André Pearce <[email protected]> Authored: Sat Dec 9 15:07:21 2017 +0000 Committer: Michael André Pearce <[email protected]> Committed: Sat Dec 9 15:07:21 2017 +0000 ---------------------------------------------------------------------- .../apache/activemq/artemis/api/core/Message.java | 4 ++++ .../artemis/core/message/impl/CoreMessage.java | 5 +++++ .../artemis/protocol/amqp/broker/AMQPMessage.java | 8 ++++++-- .../openwire/OpenWireMessageConverter.java | 9 +++++++++ .../integration/amqp/JMSClientTestSupport.java | 2 +- .../tests/integration/amqp/JMSLVQTest.java | 18 ++++++++++-------- 6 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index d7666b5..5e4dc74 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -183,6 +183,10 @@ public interface Message { return null; } + default Message setLastValueProperty(SimpleString lastValueName) { + return this; + } + /** * @deprecated do not use this, use through ICoreMessage or ClientMessage */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 516addc..2500142 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -599,6 +599,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { } @Override + public Message setLastValueProperty(SimpleString lastValueName) { + return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName); + } + + @Override public int getEncodeSize() { checkEncode(); return buffer == null ? -1 : buffer.writerIndex(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 06a5894..2e06d2e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -62,7 +62,6 @@ import org.apache.qpid.proton.message.impl.MessageImpl; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { - public static final String HDR_LAST_VALUE_NAME = org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(); public static final int DEFAULT_MESSAGE_PRIORITY = 4; public static final int MAX_MESSAGE_PRIORITY = 9; @@ -1091,7 +1090,12 @@ public class AMQPMessage extends RefCountMessage { @Override public SimpleString getLastValueProperty() { - return getSimpleStringProperty(HDR_LAST_VALUE_NAME); + return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString()); + } + + @Override + public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleString lastValueName) { + return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 88f90ee..2f9fee4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -771,6 +771,15 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag } } + SimpleString lastValueProperty = coreMessage.getLastValueProperty(); + if (lastValueProperty != null) { + try { + amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString()); + } catch (JMSException e) { + throw new IOException("failure to set lvq property " + dlqCause, e); + } + } + Set<SimpleString> props = coreMessage.getPropertyNames(); if (props != null) { for (SimpleString s : props) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index 190dd78..9e0d41a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -234,7 +234,7 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { } protected Connection createOpenWireConnection() throws JMSException { - return createCoreConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); + return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); } private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java index 38af6bf..9d7dd19 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java @@ -17,13 +17,13 @@ package org.apache.activemq.artemis.tests.integration.amqp; import javax.jms.Connection; -import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -124,23 +124,25 @@ public class JMSLVQTest extends JMSClientTestSupport { MessageProducer p = producerSession.createProducer(null); TextMessage message1 = producerSession.createTextMessage(); - message1.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); + message1.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "KEY"); message1.setText("hello"); p.send(queue1, message1); TextMessage message2 = producerSession.createTextMessage(); - message2.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); + message2.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "KEY"); message2.setText("how are you"); p.send(queue1, message2); - Session consumerSession = consumerConnection.createSession(); + //Simulate a small pause, else both messages could be consumed if consumer is fast enough + Thread.sleep(10); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME); MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); - Message msg = consumer.receive(1000); + TextMessage msg = (TextMessage) consumer.receive(1000); assertNotNull(msg); - assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME)); - assertTrue(msg instanceof TextMessage); - assertEquals("how are you", ((TextMessage)msg).getText()); + assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME.toString())); + assertEquals("how are you", msg.getText()); consumer.close(); } finally { producerConnection.close();
