Repository: activemq-artemis
Updated Branches:
  refs/heads/master 4870500ea -> aca2ae25f


ARTEMIS-1550 - Support LVQ for OpenWire 

Add support for LVQ, using the same property key as core "_AMQ_LVQ_NAME"
Update current LVQ test case to correctly test OpenWire LVQ.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fa805366
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fa805366
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fa805366

Branch: refs/heads/master
Commit: fa805366153157fcb7bb877e2377b92a418190f2
Parents: 4870500
Author: Michael André Pearce <[email protected]>
Authored: Sat Dec 9 15:07:21 2017 +0000
Committer: Michael André Pearce <[email protected]>
Committed: Sat Dec 9 15:07:21 2017 +0000

----------------------------------------------------------------------
 .../apache/activemq/artemis/api/core/Message.java |  4 ++++
 .../artemis/core/message/impl/CoreMessage.java    |  5 +++++
 .../artemis/protocol/amqp/broker/AMQPMessage.java |  8 ++++++--
 .../openwire/OpenWireMessageConverter.java        |  9 +++++++++
 .../integration/amqp/JMSClientTestSupport.java    |  2 +-
 .../tests/integration/amqp/JMSLVQTest.java        | 18 ++++++++++--------
 6 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index d7666b5..5e4dc74 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -183,6 +183,10 @@ public interface Message {
       return null;
    }
 
+   default Message setLastValueProperty(SimpleString lastValueName) {
+      return this;
+   }
+
    /**
     * @deprecated do not use this, use through ICoreMessage or ClientMessage
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 516addc..2500142 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -599,6 +599,11 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    }
 
    @Override
+   public Message setLastValueProperty(SimpleString lastValueName) {
+      return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName);
+   }
+
+   @Override
    public int getEncodeSize() {
       checkEncode();
       return buffer == null ? -1 : buffer.writerIndex();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 06a5894..2e06d2e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -62,7 +62,6 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
 // see 
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
 public class AMQPMessage extends RefCountMessage {
 
-   public static final String HDR_LAST_VALUE_NAME = 
org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString();
    public static final int DEFAULT_MESSAGE_PRIORITY = 4;
    public static final int MAX_MESSAGE_PRIORITY = 9;
 
@@ -1091,7 +1090,12 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public SimpleString getLastValueProperty() {
-      return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
+      return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString());
+   }
+
+   @Override
+   public org.apache.activemq.artemis.api.core.Message 
setLastValueProperty(SimpleString lastValueName) {
+      return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 88f90ee..2f9fee4 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -771,6 +771,15 @@ public class OpenWireMessageConverter implements 
MessageConverter<OpenwireMessag
          }
       }
 
+      SimpleString lastValueProperty = coreMessage.getLastValueProperty();
+      if (lastValueProperty != null) {
+         try {
+            
amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(),
 lastValueProperty.toString());
+         } catch (JMSException e) {
+            throw new IOException("failure to set lvq property " + dlqCause, 
e);
+         }
+      }
+
       Set<SimpleString> props = coreMessage.getPropertyNames();
       if (props != null) {
          for (SimpleString s : props) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index 190dd78..9e0d41a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -234,7 +234,7 @@ public abstract class JMSClientTestSupport extends 
AmqpClientTestSupport {
    }
 
    protected Connection createOpenWireConnection() throws JMSException {
-      return createCoreConnection(getBrokerOpenWireJMSConnectionString(), 
null, null, null, true);
+      return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), 
null, null, null, true);
    }
 
    private Connection createOpenWireConnection(String connectionString, String 
username, String password, String clientId, boolean start) throws JMSException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fa805366/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
index 38af6bf..9d7dd19 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
@@ -17,13 +17,13 @@
 package org.apache.activemq.artemis.tests.integration.amqp;
 
 import javax.jms.Connection;
-import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -124,23 +124,25 @@ public class JMSLVQTest extends JMSClientTestSupport {
          MessageProducer p = producerSession.createProducer(null);
 
          TextMessage message1 = producerSession.createTextMessage();
-         message1.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY");
+         message1.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), 
"KEY");
          message1.setText("hello");
          p.send(queue1, message1);
 
          TextMessage message2 = producerSession.createTextMessage();
-         message2.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY");
+         message2.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), 
"KEY");
          message2.setText("how are you");
          p.send(queue1, message2);
 
-         Session consumerSession = consumerConnection.createSession();
+         //Simulate a small pause, else both messages could be consumed if 
consumer is fast enough
+         Thread.sleep(10);
+         
+         Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
          Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME);
          MessageConsumer consumer = 
consumerSession.createConsumer(consumerQueue);
-         Message msg = consumer.receive(1000);
+         TextMessage msg = (TextMessage) consumer.receive(1000);
          assertNotNull(msg);
-         assertEquals("KEY", 
msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME));
-         assertTrue(msg instanceof TextMessage);
-         assertEquals("how are you", ((TextMessage)msg).getText());
+         assertEquals("KEY", 
msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME.toString()));
+         assertEquals("how are you", msg.getText());
          consumer.close();
       } finally {
          producerConnection.close();

Reply via email to