ARTEMIS-853 Support for exclusive consumers

Support exlusive consumer
Allow default address level settings for exclusive consumer
Allow queue level setting in broker.xml
Add the ability to set queue settings via Core JMS using address. Similar to 
ActiveMQ 5.X
Allow for Core JMS client to define exclusive consumer using address parameters
Add tests

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47c9a90d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47c9a90d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47c9a90d

Branch: refs/heads/master
Commit: 47c9a90dcb16b70aeef3a09c89400669add083f5
Parents: 56e1df3
Author: Michael André Pearce <michael.andre.pea...@me.com>
Authored: Tue Jan 30 17:43:14 2018 +0000
Committer: Michael Pearce <michael.pea...@ig.com>
Committed: Wed Feb 7 15:19:01 2018 +0000

----------------------------------------------------------------------
 .../artemis/api/core/ParameterisedAddress.java  | 117 +++++++
 .../artemis/api/core/QueueAttributes.java       |  79 +++++
 .../config/ActiveMQDefaultConfiguration.java    |  12 +
 .../artemis/api/core/client/ClientSession.java  |  77 +++++
 .../core/management/ActiveMQServerControl.java  |  16 +
 .../api/core/management/QueueControl.java       |  12 +
 .../core/client/impl/AddressQueryImpl.java      |  20 +-
 .../core/client/impl/ClientSessionImpl.java     |  96 +++++-
 .../core/client/impl/QueueQueryImpl.java        |  33 ++
 .../core/impl/ActiveMQSessionContext.java       |  45 ++-
 .../impl/wireformat/CreateQueueMessage_V2.java  |  41 ++-
 .../wireformat/CreateSharedQueueMessage_V2.java |  75 ++++-
 .../SessionBindingQueryResponseMessage_V4.java  |  35 +-
 .../SessionQueueQueryResponseMessage_V3.java    |  49 ++-
 .../artemis/core/server/QueueQueryResult.java   |  20 +-
 .../spi/core/remoting/SessionContext.java       |  25 ++
 .../artemis/jms/client/ActiveMQDestination.java |  28 +-
 .../jms/client/ActiveMQMessageProducer.java     |  26 +-
 .../artemis/jms/client/ActiveMQSession.java     |  88 ++++-
 .../jms/client/ActiveMQParameterTest.java       |  46 +++
 .../client/HornetQClientSessionContext.java     |   2 +-
 .../core/config/CoreQueueConfiguration.java     |  36 +++
 .../deployers/impl/FileConfigurationParser.java |  19 +-
 .../impl/ActiveMQServerControlImpl.java         |  15 +-
 .../core/management/impl/QueueControlImpl.java  |  24 ++
 .../core/management/impl/view/QueueView.java    |   8 +-
 .../view/predicate/QueueFilterPredicate.java    |   6 +-
 .../core/persistence/QueueBindingInfo.java      |   4 +
 .../journal/AbstractJournalStorageManager.java  |   2 +-
 .../codec/PersistentQueueBindingEncoding.java   |  26 +-
 .../artemis/core/postoffice/PostOffice.java     |   3 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   7 +-
 .../core/ServerSessionPacketHandler.java        |   8 +-
 .../artemis/core/server/ActiveMQServer.java     |  17 +
 .../artemis/core/server/BindingQueryResult.java |  20 +-
 .../activemq/artemis/core/server/Queue.java     |   6 +
 .../artemis/core/server/QueueConfig.java        |  39 ++-
 .../artemis/core/server/ServerSession.java      |  31 ++
 .../core/server/impl/ActiveMQServerImpl.java    |  78 ++++-
 .../core/server/impl/LastValueQueue.java        |   8 +-
 .../server/impl/PostOfficeJournalLoader.java    |   1 +
 .../core/server/impl/QueueFactoryImpl.java      |  11 +-
 .../artemis/core/server/impl/QueueImpl.java     |  50 ++-
 .../core/server/impl/ServerSessionImpl.java     |  71 ++++-
 .../core/settings/impl/AddressSettings.java     |  60 +++-
 .../resources/schema/artemis-configuration.xsd  |  22 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  15 +
 .../test/resources/artemis-configuration.xsd    |  23 +-
 .../tests/integration/amqp/JMSLVQTest.java      |   2 +-
 .../client/MultipleProducersTest.java           |   2 +-
 .../client/SessionCreateAndDeleteQueueTest.java |   2 +-
 .../integration/client/UpdateQueueTest.java     |   2 +-
 .../integration/jms/client/ExclusiveTest.java   | 319 +++++++++++++++++++
 .../tests/integration/jms/client/LVQTest.java   | 145 +++++++++
 .../ActiveMQServerControlUsingCoreTest.java     |  10 +
 .../management/QueueControlUsingCoreTest.java   |  10 +
 .../integration/server/LVQRecoveryTest.java     |   4 +-
 .../tests/integration/server/LVQTest.java       |   2 +-
 .../jms/tests/message/MessageHeaderTest.java    |  27 ++
 .../unit/core/postoffice/impl/FakeQueue.java    |  17 +
 .../core/server/impl/fakes/FakePostOffice.java  |   3 +-
 61 files changed, 1967 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
new file mode 100644
index 0000000..bbc3c4d
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+import java.util.Map;
+
+public class ParameterisedAddress {
+
+   public static SimpleString toParameterisedAddress(SimpleString address, 
Map<String, String> parameters) {
+      if (parameters != null && parameters.size() > 0) {
+         return 
SimpleString.toSimpleString(toParameterisedAddress(address.toString(), 
parameters));
+      } else {
+         return address;
+      }
+   }
+
+   public static String toParameterisedAddress(String address, Map<String, 
String> parameters) {
+      if (parameters != null && parameters.size() > 0) {
+         StringBuilder stringBuilder = new 
StringBuilder(address).append(PARAMETER_MARKER);
+         return toParameterString(stringBuilder, parameters).toString();
+      } else {
+         return address;
+      }
+   }
+
+   private static StringBuilder toParameterString(StringBuilder stringBuilder, 
Map<String, String> parameters) {
+      boolean first = true;
+      for (Map.Entry<String, String> entry : parameters.entrySet()) {
+         if (first) {
+            first = false;
+         } else {
+            stringBuilder.append(PARAMETER_SEPERATOR);
+         }
+         
stringBuilder.append(entry.getKey()).append(PARAMETER_KEY_VALUE_SEPERATOR).append(entry.getValue());
+      }
+      return stringBuilder;
+   }
+
+   public static char PARAMETER_SEPERATOR = '&';
+   public static char PARAMETER_KEY_VALUE_SEPERATOR = '=';
+   public static char PARAMETER_MARKER = '?';
+   public static String PARAMETER_SEPERATOR_STRING = 
Character.toString(PARAMETER_SEPERATOR);
+   public static String PARAMETER_KEY_VALUE_SEPERATOR_STRING = 
Character.toString(PARAMETER_KEY_VALUE_SEPERATOR);
+   public static String PARAMETER_MARKER_STRING = 
Character.toString(PARAMETER_MARKER);
+   private final SimpleString address;
+   private final QueueAttributes queueAttributes;
+
+   public SimpleString getAddress() {
+      return address;
+   }
+
+   public QueueAttributes getQueueAttributes() {
+      return queueAttributes;
+   }
+
+   public ParameterisedAddress(SimpleString address, QueueAttributes 
queueAttributes) {
+      this.address = address;
+      this.queueAttributes = queueAttributes;
+   }
+
+   public ParameterisedAddress(String address, QueueAttributes 
queueAttributes) {
+      this(SimpleString.toSimpleString(address), queueAttributes);
+   }
+
+   public ParameterisedAddress(SimpleString address) {
+      this(address.toString());
+   }
+
+   public ParameterisedAddress(String address) {
+      int index = address.indexOf(PARAMETER_MARKER);
+      if (index == -1) {
+         this.address = SimpleString.toSimpleString(address);
+         this.queueAttributes = null;
+      } else {
+         this.address = SimpleString.toSimpleString(address.substring(0, 
index));
+         String parametersString = address.substring(index + 1, 
address.length());
+         String[] parameterPairs = 
parametersString.split(PARAMETER_SEPERATOR_STRING);
+         QueueAttributes queueAttributes = new QueueAttributes();
+         for (String param : parameterPairs) {
+            String[] keyValue = 
param.split(PARAMETER_KEY_VALUE_SEPERATOR_STRING);
+            if (keyValue.length != 2) {
+               throw new IllegalArgumentException("Malformed parameter section 
" + param);
+            } else {
+               queueAttributes.set(keyValue[0], keyValue[1]);
+            }
+         }
+         this.queueAttributes = queueAttributes;
+      }
+   }
+
+   public boolean isParameterised() {
+      return this.queueAttributes != null;
+   }
+
+   public static boolean isParameterised(String address) {
+      return address.contains(PARAMETER_MARKER_STRING);
+   }
+
+   public static boolean isParameterised(SimpleString address) {
+      return address.contains(PARAMETER_MARKER);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
new file mode 100644
index 0000000..6a43b39
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import java.io.Serializable;
+
+public class QueueAttributes implements Serializable {
+
+   public static final String MAX_CONSUMERS = "max-consumers";
+   public static final String EXCLUSIVE = "exclusive";
+   public static final String LAST_VALUE = "last-value";
+   public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers";
+
+   private Integer maxConsumers;
+   private Boolean exclusive;
+   private Boolean lastValue;
+   private Boolean purgeOnNoConsumers;
+
+   public void set(String key, String value) {
+      if (key != null && value != null) {
+         if (key.equals(MAX_CONSUMERS)) {
+            setMaxConsumers(Integer.valueOf(value));
+         } else if (key.equals(EXCLUSIVE)) {
+            setExclusive(Boolean.valueOf(value));
+         } else if (key.equals(LAST_VALUE)) {
+            setLastValue(Boolean.valueOf(value));
+         } else if (key.equals(PURGE_ON_NO_CONSUMERS)) {
+            setPurgeOnNoConsumers(Boolean.valueOf(value));
+         }
+      }
+   }
+
+   public Integer getMaxConsumers() {
+      return maxConsumers;
+   }
+
+   public void setMaxConsumers(Integer maxConsumers) {
+      this.maxConsumers = maxConsumers;
+   }
+
+   public Boolean getExclusive() {
+      return exclusive;
+   }
+
+   public void setExclusive(Boolean exclusive) {
+      this.exclusive = exclusive;
+   }
+
+   public Boolean getLastValue() {
+      return lastValue;
+   }
+
+   public void setLastValue(Boolean lastValue) {
+      this.lastValue = lastValue;
+   }
+
+   public Boolean getPurgeOnNoConsumers() {
+      return purgeOnNoConsumers;
+   }
+
+   public void setPurgeOnNoConsumers(Boolean purgeOnNoConsumers) {
+      this.purgeOnNoConsumers = purgeOnNoConsumers;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index fe7e1bf..87f6dad 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -465,6 +465,10 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final int DEFAULT_MAX_QUEUE_CONSUMERS = -1;
 
+   public static final boolean DEFAULT_EXCLUSIVE = false;
+
+   public static final boolean DEFAULT_LAST_VALUE = false;
+
    public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false;
 
    public static final RoutingType DEFAULT_ROUTING_TYPE = 
RoutingType.MULTICAST;
@@ -1277,6 +1281,14 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_MAX_QUEUE_CONSUMERS;
    }
 
+   public static boolean getDefaultExclusive() {
+      return DEFAULT_EXCLUSIVE;
+   }
+
+   public static boolean getDefaultLastValue() {
+      return DEFAULT_EXCLUSIVE;
+   }
+
    public static boolean getDefaultPurgeOnNoConsumers() {
       return DEFAULT_PURGE_ON_NO_CONSUMERS;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index ce56013..ddc8168 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -75,6 +75,10 @@ public interface ClientSession extends XAResource, 
AutoCloseable {
       boolean isDefaultPurgeOnNoConsumers();
 
       int getDefaultMaxConsumers();
+
+      Boolean isDefaultLastValueQueue();
+
+      Boolean isDefaultExclusive();
    }
 
    /**
@@ -139,6 +143,10 @@ public interface ClientSession extends XAResource, 
AutoCloseable {
       boolean isPurgeOnNoConsumers();
 
       boolean isAutoCreated();
+
+      Boolean isExclusive();
+
+      Boolean isLastValue();
    }
 
    // Lifecycle operations ------------------------------------------
@@ -455,6 +463,23 @@ public interface ClientSession extends XAResource, 
AutoCloseable {
                           boolean durable) throws ActiveMQException;
 
    /**
+    * Creates Shared queue. A queue that will exist as long as there are 
consumers or is durable.
+    *
+    * @param address   the queue will be bound to this address
+    * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+    * @param queueName the name of the queue
+    * @param filter    whether the queue is durable or not
+    * @param durable   if the queue is durable
+    * @param maxConsumers how many concurrent consumers will be allowed on 
this queue
+    * @param purgeOnNoConsumers whether to delete the contents of the queue 
when the last consumer disconnects
+    * @param exclusive    if the queue is exclusive queue
+    * @param lastValue    if the queue is last value queue
+    * @throws ActiveMQException in an exception occurs while creating the queue
+    */
+   void createSharedQueue(SimpleString address, RoutingType routingType, 
SimpleString queueName, SimpleString filter,
+                          boolean durable, Integer maxConsumers, Boolean 
purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws 
ActiveMQException;
+
+   /**
     * Creates a <em>non-temporary</em> queue.
     *
     * @param address   the queue will be bound to this address
@@ -541,6 +566,24 @@ public interface ClientSession extends XAResource, 
AutoCloseable {
                     boolean durable, boolean autoCreated, int maxConsumers, 
boolean purgeOnNoConsumers) throws ActiveMQException;
 
    /**
+    * Creates a <em>non-temporary</em> queue.
+    *
+    * @param address      the queue will be bound to this address
+    * @param routingType  the delivery mode for this queue, MULTICAST or 
ANYCAST
+    * @param queueName    the name of the queue
+    * @param filter       only messages which match this filter will be put in 
the queue
+    * @param durable      whether the queue is durable or not
+    * @param autoCreated  whether to mark this queue as autoCreated or not
+    * @param maxConsumers how many concurrent consumers will be allowed on 
this queue
+    * @param purgeOnNoConsumers whether to delete the contents of the queue 
when the last consumer disconnects
+    * @param exclusive whether the queue should be exclusive
+    * @param lastValue whether the queue should be lastValue
+    * @throws ActiveMQException
+    */
+   void createQueue(SimpleString address, RoutingType routingType, 
SimpleString queueName, SimpleString filter,
+                    boolean durable, boolean autoCreated, int maxConsumers, 
boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws 
ActiveMQException;
+
+   /**
     * Creates a <em>non-temporary</em>queue.
     *
     * @param address     the queue will be bound to this address
@@ -570,6 +613,24 @@ public interface ClientSession extends XAResource, 
AutoCloseable {
                            int maxConsumers, boolean purgeOnNoConsumers) 
throws ActiveMQException;
 
    /**
+    * Creates a <em>non-temporary</em>queue.
+    *
+    * @param address     the queue will be bound to this address
+    * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+    * @param queueName   the name of the queue
+    * @param filter      only messages which match this filter will be put in 
the queue
+    * @param durable     whether the queue is durable or not
+    * @param autoCreated whether to mark this queue as autoCreated or not
+    * @param maxConsumers how many concurrent consumers will be allowed on 
this queue
+    * @param purgeOnNoConsumers whether to delete the contents of the queue 
when the last consumer disconnects
+    * @param exclusive whether the queue should be exclusive
+    * @param lastValue whether the queue should be lastValue
+    * @throws ActiveMQException
+    */
+   void createQueue(String address, RoutingType routingType, String queueName, 
String filter, boolean durable, boolean autoCreated,
+                    int maxConsumers, boolean purgeOnNoConsumers, Boolean 
exclusive, Boolean lastValue) throws ActiveMQException;
+
+   /**
     * Creates a <em>temporary</em> queue.
     *
     * @param address   the queue will be bound to this address
@@ -596,6 +657,22 @@ public interface ClientSession extends XAResource, 
AutoCloseable {
     * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
     * @param queueName the name of the queue
     * @param filter    only messages which match this filter will be put in 
the queue
+    * @param maxConsumers how many concurrent consumers will be allowed on 
this queue
+    * @param purgeOnNoConsumers whether to delete the contents of the queue 
when the last consumer disconnects
+    * @param exclusive    if the queue is exclusive queue
+    * @param lastValue    if the queue is last value queue
+    * @throws ActiveMQException in an exception occurs while creating the queue
+    */
+   void createTemporaryQueue(SimpleString address, RoutingType routingType, 
SimpleString queueName, SimpleString filter, int maxConsumers,
+                             boolean purgeOnNoConsumers, Boolean exclusive, 
Boolean lastValue) throws ActiveMQException;
+
+   /**
+    * Creates a <em>temporary</em> queue with a filter.
+    *
+    * @param address   the queue will be bound to this address
+    * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+    * @param queueName the name of the queue
+    * @param filter    only messages which match this filter will be put in 
the queue
     * @throws ActiveMQException in an exception occurs while creating the queue
     */
    void createTemporaryQueue(SimpleString address, RoutingType routingType, 
SimpleString queueName, SimpleString filter) throws ActiveMQException;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 7c9a40a..fc9de24 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -617,6 +617,22 @@ public interface ActiveMQServerControl {
                       @Parameter(name = "purgeOnNoConsumers", desc = "Delete 
this queue when the last consumer disconnects") Boolean purgeOnNoConsumers) 
throws Exception;
 
 
+   /**
+    * Update a queue.
+    *
+    * @param name                name of the queue
+    * @param routingType         the routing type used for this address, 
{@code MULTICAST} or {@code ANYCAST}
+    * @param maxConsumers        the maximum number of consumers allowed on 
this queue at any one time
+    * @param purgeOnNoConsumers delete this queue when the last consumer 
disconnects
+    * @return a textual summary of the queue
+    * @throws Exception
+    */
+   String updateQueue(@Parameter(name = "name", desc = "Name of the queue") 
String name,
+                      @Parameter(name = "routingType", desc = "The routing 
type used for this address, MULTICAST or ANYCAST") String routingType,
+                      @Parameter(name = "maxConsumers", desc = "The maximum 
number of consumers allowed on this queue at any one time") Integer 
maxConsumers,
+                      @Parameter(name = "purgeOnNoConsumers", desc = "Delete 
this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
+                      @Parameter(name = "exclusive", desc = "If the queue 
should route exclusively to one consumer") Boolean exclusive) throws Exception;
+
 
    /**
     * Deploy a durable queue.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 9eec8e0..447417f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -164,6 +164,18 @@ public interface QueueControl {
    @Attribute(desc = "delete this queue when the last consumer disconnects")
    boolean isPurgeOnNoConsumers();
 
+   /**
+    *
+    */
+   @Attribute(desc = "If the queue should route exclusively to one consumer")
+   boolean isExclusive();
+
+   /**
+    *
+    */
+   @Attribute(desc = "is this queue a last value queue")
+   boolean isLastValue();
+
    // Operations ----------------------------------------------------
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
index 79e2cc9..cb91919 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AddressQueryImpl.java
@@ -36,18 +36,26 @@ public class AddressQueryImpl implements 
ClientSession.AddressQuery {
 
    private final int defaultMaxConsumers;
 
+   private final Boolean defaultExclusive;
+
+   private final Boolean defaultLastValue;
+
    public AddressQueryImpl(final boolean exists,
                            final List<SimpleString> queueNames,
                            final boolean autoCreateQueues,
                            final boolean autoCreateAddresses,
                            final boolean defaultPurgeOnNoConsumers,
-                           final int defaultMaxConsumers) {
+                           final int defaultMaxConsumers,
+                           final Boolean defaultExclusive,
+                           final Boolean defaultLastValue) {
       this.exists = exists;
       this.queueNames = new ArrayList<>(queueNames);
       this.autoCreateQueues = autoCreateQueues;
       this.autoCreateAddresses = autoCreateAddresses;
       this.defaultPurgeOnNoConsumers = defaultPurgeOnNoConsumers;
       this.defaultMaxConsumers = defaultMaxConsumers;
+      this.defaultExclusive = defaultExclusive;
+      this.defaultLastValue = defaultLastValue;
    }
 
    @Override
@@ -79,4 +87,14 @@ public class AddressQueryImpl implements 
ClientSession.AddressQuery {
    public int getDefaultMaxConsumers() {
       return defaultMaxConsumers;
    }
+
+   @Override
+   public Boolean isDefaultLastValueQueue() {
+      return defaultLastValue;
+   }
+
+   @Override
+   public Boolean isDefaultExclusive() {
+      return defaultExclusive;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index e5e91db..ab9888e 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -376,7 +376,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                           false,
                           
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
                           
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),
-                          autoCreated);
+                          autoCreated, null, null);
    }
 
    @Override
@@ -400,12 +400,42 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                           false,
                           maxConsumers,
                           purgeOnNoConsumers,
-                          autoCreated);
+                          autoCreated, null, null);
+   }
+
+   @Override
+   public void createQueue(final SimpleString address, final RoutingType 
routingType, final SimpleString queueName, final SimpleString filterString,
+                           final boolean durable, final boolean autoCreated, 
final int maxConsumers, final boolean purgeOnNoConsumers, final Boolean 
exclusive, final Boolean lastValue) throws ActiveMQException {
+      internalCreateQueue(address,
+                          queueName, routingType,
+                          filterString,
+                          durable,
+                          false,
+                          maxConsumers,
+                          purgeOnNoConsumers,
+                          autoCreated,
+                          exclusive,
+                          lastValue);
    }
 
    @Override
    public void createQueue(final String address, final RoutingType 
routingType, final String queueName, final String filterString,
                            final boolean durable, final boolean autoCreated, 
final int maxConsumers, final boolean purgeOnNoConsumers) throws 
ActiveMQException {
+      createQueue(address,
+                  routingType,
+                  queueName,
+                  filterString,
+                  durable,
+                  autoCreated,
+                  maxConsumers,
+                  purgeOnNoConsumers,
+                  null,
+                  null);
+   }
+
+   @Override
+   public void createQueue(final String address, final RoutingType 
routingType, final String queueName, final String filterString,
+                           final boolean durable, final boolean autoCreated, 
final int maxConsumers, final boolean purgeOnNoConsumers, Boolean exclusive, 
Boolean lastValue) throws ActiveMQException {
       createQueue(SimpleString.toSimpleString(address),
                   routingType,
                   SimpleString.toSimpleString(queueName),
@@ -413,7 +443,9 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                   durable,
                   autoCreated,
                   maxConsumers,
-                  purgeOnNoConsumers);
+                  purgeOnNoConsumers,
+                  exclusive,
+                  lastValue);
    }
 
    @Override
@@ -432,15 +464,27 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
    public void createTemporaryQueue(final SimpleString address,
                                     final RoutingType routingType,
                                     final SimpleString queueName,
-                                    final SimpleString filter) throws 
ActiveMQException {
+                                    final SimpleString filter,
+                                    final int maxConsumers,
+                                    final boolean purgeOnNoConsumers,
+                                    final Boolean exclusive,
+                                    final Boolean lastValue) throws 
ActiveMQException {
       internalCreateQueue(address,
                           queueName, routingType,
                           filter,
                           false,
                           true,
-                          
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
-                          
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),
-                          false);
+                          maxConsumers,
+                          purgeOnNoConsumers,
+                          false, exclusive, lastValue);
+   }
+
+   @Override
+   public void createTemporaryQueue(final SimpleString address,
+                                    final RoutingType routingType,
+                                    final SimpleString queueName,
+                                    final SimpleString filter) throws 
ActiveMQException {
+      createTemporaryQueue(address, routingType, queueName, filter, 
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), null, null);
    }
 
    @Override
@@ -466,7 +510,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                           false,
                           
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
                           
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),
-                          false);
+                          false, null, null);
    }
 
    /**
@@ -500,11 +544,31 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
    @Override
    public void createSharedQueue(SimpleString address, RoutingType 
routingType, SimpleString queueName, SimpleString filter,
                                  boolean durable) throws ActiveMQException {
+      createSharedQueue(address, routingType, queueName, filter, durable, 
null, null, null, null);
+   }
+
+   /**
+    * Creates Shared queue. A queue that will exist as long as there are 
consumers or is durable.
+    *
+    * @param address      the queue will be bound to this address
+    * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+    * @param queueName    the name of the queue
+    * @param filter       whether the queue is durable or not
+    * @param durable      if the queue is durable
+    * @param maxConsumers how many concurrent consumers will be allowed on 
this queue
+    * @param purgeOnNoConsumers whether to delete the contents of the queue 
when the last consumer disconnects
+    * @param exclusive    if the queue is exclusive queue
+    * @param lastValue    if the queue is last value queue
+    * @throws ActiveMQException in an exception occurs while creating the queue
+    */
+   @Override
+   public void createSharedQueue(SimpleString address, RoutingType 
routingType, SimpleString queueName, SimpleString filter,
+                                 boolean durable, Integer maxConsumers, 
Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws 
ActiveMQException {
       checkClosed();
 
       startCall();
       try {
-         sessionContext.createSharedQueue(address, queueName, routingType, 
filter, durable);
+         sessionContext.createSharedQueue(address, queueName, routingType, 
filter, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
       } finally {
          endCall();
       }
@@ -541,7 +605,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                           false,
                           
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
                           
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),
-                          false);
+                          false, null, null);
    }
 
    /**
@@ -562,7 +626,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                           false,
                           
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
                           
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),
-                          false);
+                          false, null, null);
    }
 
    /**
@@ -586,7 +650,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                           false,
                           
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
                           
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),
-                          false);
+                          false, null, null);
    }
 
    /**
@@ -1847,7 +1911,9 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                                     final boolean temp,
                                     final int maxConsumers,
                                     final boolean purgeOnNoConsumers,
-                                    final boolean autoCreated) throws 
ActiveMQException {
+                                    final boolean autoCreated,
+                                    final Boolean exclusive,
+                                    final Boolean lastValue) throws 
ActiveMQException {
       checkClosed();
 
       if (durable && temp) {
@@ -1864,7 +1930,9 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                                     temp,
                                     maxConsumers,
                                     purgeOnNoConsumers,
-                                    autoCreated);
+                                    autoCreated,
+                                    exclusive,
+                                    lastValue);
       } finally {
          endCall();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 71488c2..8dc35a9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -48,6 +48,10 @@ public class QueueQueryImpl implements 
ClientSession.QueueQuery {
 
    private final int maxConsumers;
 
+   private final Boolean exclusive;
+
+   private final Boolean lastValue;
+
    public QueueQueryImpl(final boolean durable,
                          final boolean temporary,
                          final int consumerCount,
@@ -84,6 +88,23 @@ public class QueueQueryImpl implements 
ClientSession.QueueQuery {
                          final boolean autoCreated,
                          final boolean purgeOnNoConsumers,
                          final RoutingType routingType) {
+      this(durable, temporary, consumerCount, messageCount, filterString, 
address, name, exists, autoCreateQueues, maxConsumers, autoCreated, 
purgeOnNoConsumers, routingType, null, null);
+   }
+   public QueueQueryImpl(final boolean durable,
+                         final boolean temporary,
+                         final int consumerCount,
+                         final long messageCount,
+                         final SimpleString filterString,
+                         final SimpleString address,
+                         final SimpleString name,
+                         final boolean exists,
+                         final boolean autoCreateQueues,
+                         final int maxConsumers,
+                         final boolean autoCreated,
+                         final boolean purgeOnNoConsumers,
+                         final RoutingType routingType,
+                         final Boolean exclusive,
+                         final Boolean lastValue) {
       this.durable = durable;
       this.temporary = temporary;
       this.consumerCount = consumerCount;
@@ -97,6 +118,8 @@ public class QueueQueryImpl implements 
ClientSession.QueueQuery {
       this.autoCreated = autoCreated;
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.routingType = routingType;
+      this.exclusive = exclusive;
+      this.lastValue = lastValue;
    }
 
    @Override
@@ -164,5 +187,15 @@ public class QueueQueryImpl implements 
ClientSession.QueueQuery {
       return autoCreated;
    }
 
+   @Override
+   public Boolean isExclusive() {
+      return exclusive;
+   }
+
+   @Override
+   public Boolean isLastValue() {
+      return lastValue;
+   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 3a4878b..6037925 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -249,8 +249,21 @@ public class ActiveMQSessionContext extends SessionContext 
{
                                  SimpleString queueName,
                                  RoutingType routingType,
                                  SimpleString filterString,
+                                 boolean durable,
+                                 Integer maxConsumers,
+                                 Boolean purgeOnNoConsumers,
+                                 Boolean exclusive,
+                                 Boolean lastValue) throws ActiveMQException {
+      sessionChannel.sendBlocking(new CreateSharedQueueMessage_V2(address, 
queueName, routingType, filterString, durable, maxConsumers, 
purgeOnNoConsumers, exclusive, lastValue, true), PacketImpl.NULL_RESPONSE);
+   }
+
+   @Override
+   public void createSharedQueue(SimpleString address,
+                                 SimpleString queueName,
+                                 RoutingType routingType,
+                                 SimpleString filterString,
                                  boolean durable) throws ActiveMQException {
-      sessionChannel.sendBlocking(new CreateSharedQueueMessage_V2(address, 
queueName, routingType, filterString, durable, true), PacketImpl.NULL_RESPONSE);
+      createSharedQueue(address, queueName, routingType, filterString, 
durable, null, null, null, null);
    }
 
    @Override
@@ -325,19 +338,19 @@ public class ActiveMQSessionContext extends 
SessionContext {
       if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, 
getServerVersion())) {
          Packet packet = sessionChannel.sendBlocking(new 
SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V4);
          SessionBindingQueryResponseMessage_V4 response = 
(SessionBindingQueryResponseMessage_V4) packet;
-         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), response.isAutoCreateQueues(), 
response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), 
response.getDefaultMaxConsumers());
+         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), response.isAutoCreateQueues(), 
response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), 
response.getDefaultMaxConsumers(), response.isDefaultExclusive(), 
response.isDefaultLastValue());
       } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, 
getServerVersion())) {
          Packet packet = sessionChannel.sendBlocking(new 
SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3);
          SessionBindingQueryResponseMessage_V3 response = 
(SessionBindingQueryResponseMessage_V3) packet;
-         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), response.isAutoCreateQueues(), 
response.isAutoCreateAddresses(), 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
+         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), response.isAutoCreateQueues(), 
response.isAutoCreateAddresses(), 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null);
       } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2, 
getServerVersion())) {
          Packet packet = sessionChannel.sendBlocking(new 
SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
          SessionBindingQueryResponseMessage_V2 response = 
(SessionBindingQueryResponseMessage_V2) packet;
-         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), response.isAutoCreateQueues(), false, 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
+         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), response.isAutoCreateQueues(), false, 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null);
       } else {
          Packet packet = sessionChannel.sendBlocking(new 
SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
          SessionBindingQueryResponseMessage response = 
(SessionBindingQueryResponseMessage) packet;
-         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), false, false, 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
+         return new AddressQueryImpl(response.isExists(), 
response.getQueueNames(), false, false, 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null);
       }
    }
 
@@ -647,16 +660,32 @@ public class ActiveMQSessionContext extends 
SessionContext {
                            boolean temp,
                            int maxConsumers,
                            boolean purgeOnNoConsumers,
-                           boolean autoCreated) throws ActiveMQException {
+                           boolean autoCreated,
+                           Boolean exclusive,
+                           Boolean lastValue) throws ActiveMQException {
       if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
          CreateQueueMessage request = new CreateQueueMessage(address, 
queueName, filterString, durable, temp, true);
          sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
       } else {
-         CreateQueueMessage request = new CreateQueueMessage_V2(address, 
queueName, routingType, filterString, durable, temp, maxConsumers, 
purgeOnNoConsumers, autoCreated, true);
+         CreateQueueMessage request = new CreateQueueMessage_V2(address, 
queueName, routingType, filterString, durable, temp, maxConsumers, 
purgeOnNoConsumers, autoCreated, true, exclusive, lastValue);
          sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
       }
    }
 
+   @Deprecated
+   @Override
+   public void createQueue(SimpleString address,
+                           RoutingType routingType,
+                           SimpleString queueName,
+                           SimpleString filterString,
+                           boolean durable,
+                           boolean temp,
+                           int maxConsumers,
+                           boolean purgeOnNoConsumers,
+                           boolean autoCreated) throws ActiveMQException {
+      createQueue(address, routingType, queueName, filterString, durable, 
temp, maxConsumers, purgeOnNoConsumers, autoCreated, null, null);
+   }
+
    @Override
    public boolean reattachOnNewConnection(RemotingConnection newConnection) 
throws ActiveMQException {
 
@@ -741,7 +770,7 @@ public class ActiveMQSessionContext extends SessionContext {
       // they are defined in broker.xml
       // This allows e.g. JMS non durable subs and temporary queues to 
continue to be used after failover
       if (!queueInfo.isDurable()) {
-         CreateQueueMessage_V2 createQueueRequest = new 
CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), 
queueInfo.getRoutingType(), queueInfo.getFilterString(), false, 
queueInfo.isTemporary(), queueInfo.getMaxConsumers(), 
queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false);
+         CreateQueueMessage_V2 createQueueRequest = new 
CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), 
queueInfo.getRoutingType(), queueInfo.getFilterString(), false, 
queueInfo.isTemporary(), queueInfo.getMaxConsumers(), 
queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, 
queueInfo.isExclusive(), queueInfo.isLastValue());
 
          sendPacketWithoutLock(sessionChannel, createQueueRequest);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index 4c4b737..4ffeb5c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -19,6 +19,7 @@ package 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.utils.BufferHelper;
 
 public class CreateQueueMessage_V2 extends CreateQueueMessage {
 
@@ -30,6 +31,10 @@ public class CreateQueueMessage_V2 extends 
CreateQueueMessage {
 
    private boolean purgeOnNoConsumers;
 
+   private Boolean exclusive;
+
+   private Boolean lastValue;
+
    public CreateQueueMessage_V2(final SimpleString address,
                                 final SimpleString queueName,
                                 final RoutingType routingType,
@@ -39,7 +44,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage 
{
                                 final int maxConsumers,
                                 final boolean purgeOnNoConsumers,
                                 final boolean autoCreated,
-                                final boolean requiresResponse) {
+                                final boolean requiresResponse,
+                                final Boolean exclusive,
+                                final Boolean lastValue) {
       this();
 
       this.address = address;
@@ -52,6 +59,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage 
{
       this.routingType = routingType;
       this.maxConsumers = maxConsumers;
       this.purgeOnNoConsumers = purgeOnNoConsumers;
+      this.exclusive = exclusive;
+      this.lastValue = lastValue;
    }
 
    public CreateQueueMessage_V2() {
@@ -67,6 +76,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage 
{
       buff.append(", routingType=" + routingType);
       buff.append(", maxConsumers=" + maxConsumers);
       buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
+      buff.append(", exclusive=" + exclusive);
+      buff.append(", lastValue=" + lastValue);
       buff.append("]");
       return buff.toString();
    }
@@ -103,6 +114,22 @@ public class CreateQueueMessage_V2 extends 
CreateQueueMessage {
       this.autoCreated = autoCreated;
    }
 
+   public Boolean isExclusive() {
+      return exclusive;
+   }
+
+   public void setExclusive(Boolean exclusive) {
+      this.exclusive = exclusive;
+   }
+
+   public Boolean isLastValue() {
+      return lastValue;
+   }
+
+   public void setLastValue(Boolean lastValue) {
+      this.lastValue = lastValue;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -110,6 +137,8 @@ public class CreateQueueMessage_V2 extends 
CreateQueueMessage {
       buffer.writeByte(routingType == null ? -1 : routingType.getType());
       buffer.writeInt(maxConsumers);
       buffer.writeBoolean(purgeOnNoConsumers);
+      BufferHelper.writeNullableBoolean(buffer, exclusive);
+      BufferHelper.writeNullableBoolean(buffer, lastValue);
    }
 
    @Override
@@ -119,6 +148,10 @@ public class CreateQueueMessage_V2 extends 
CreateQueueMessage {
       routingType = RoutingType.getType(buffer.readByte());
       maxConsumers = buffer.readInt();
       purgeOnNoConsumers = buffer.readBoolean();
+      if (buffer.readableBytes() > 0) {
+         exclusive = BufferHelper.readNullableBoolean(buffer);
+         lastValue = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -129,6 +162,8 @@ public class CreateQueueMessage_V2 extends 
CreateQueueMessage {
       result = prime * result + (routingType.getType());
       result = prime * result + (maxConsumers);
       result = prime * result + (purgeOnNoConsumers ? 1231 : 1237);
+      result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 
1237);
+      result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 
1237);
       return result;
    }
 
@@ -147,7 +182,9 @@ public class CreateQueueMessage_V2 extends 
CreateQueueMessage {
          return false;
       if (purgeOnNoConsumers != other.purgeOnNoConsumers)
          return false;
-      if (purgeOnNoConsumers != other.purgeOnNoConsumers)
+      if (exclusive != other.exclusive)
+         return false;
+      if (lastValue != other.lastValue)
          return false;
       if (routingType == null) {
          if (other.routingType != null)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index beebd12..8763043 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -19,16 +19,25 @@ package 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.utils.BufferHelper;
 
 public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
 
    private RoutingType routingType;
+   Integer maxConsumers;
+   Boolean purgeOnNoConsumers;
+   private Boolean exclusive;
+   private Boolean lastValue;
 
    public CreateSharedQueueMessage_V2(final SimpleString address,
                                       final SimpleString queueName,
                                       final RoutingType routingType,
                                       final SimpleString filterString,
                                       final boolean durable,
+                                      final Integer maxConsumers,
+                                      final Boolean purgeOnNoConsumers,
+                                      final Boolean exclusive,
+                                      final Boolean lastValue,
                                       final boolean requiresResponse) {
       this();
 
@@ -36,8 +45,13 @@ public class CreateSharedQueueMessage_V2 extends 
CreateSharedQueueMessage {
       this.queueName = queueName;
       this.filterString = filterString;
       this.durable = durable;
-      this.requiresResponse = requiresResponse;
       this.routingType = routingType;
+      this.maxConsumers = maxConsumers;
+      this.purgeOnNoConsumers = purgeOnNoConsumers;
+      this.exclusive = exclusive;
+      this.lastValue = lastValue;
+      this.requiresResponse = requiresResponse;
+
    }
 
    public CreateSharedQueueMessage_V2() {
@@ -52,6 +66,38 @@ public class CreateSharedQueueMessage_V2 extends 
CreateSharedQueueMessage {
       this.routingType = routingType;
    }
 
+   public Integer getMaxConsumers() {
+      return maxConsumers;
+   }
+
+   public void setMaxConsumers(Integer maxConsumers) {
+      this.maxConsumers = maxConsumers;
+   }
+
+   public Boolean isPurgeOnNoConsumers() {
+      return purgeOnNoConsumers;
+   }
+
+   public void setPurgeOnNoConsumers(Boolean purgeOnNoConsumers) {
+      this.purgeOnNoConsumers = purgeOnNoConsumers;
+   }
+
+   public Boolean isExclusive() {
+      return exclusive;
+   }
+
+   public void setExclusive(Boolean exclusive) {
+      this.exclusive = exclusive;
+   }
+
+   public Boolean isLastValue() {
+      return lastValue;
+   }
+
+   public void setLastValue(Boolean lastValue) {
+      this.lastValue = lastValue;
+   }
+
    @Override
    public String toString() {
       StringBuffer buff = new StringBuffer(getParentString());
@@ -59,6 +105,11 @@ public class CreateSharedQueueMessage_V2 extends 
CreateSharedQueueMessage {
       buff.append(", queueName=" + queueName);
       buff.append(", filterString=" + filterString);
       buff.append(", durable=" + durable);
+      buff.append(", routingType=" + routingType);
+      buff.append(", maxConsumers=" + maxConsumers);
+      buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
+      buff.append(", exclusive=" + exclusive);
+      buff.append(", lastValue=" + lastValue);
       buff.append(", requiresResponse=" + requiresResponse);
       buff.append("]");
       return buff.toString();
@@ -72,6 +123,10 @@ public class CreateSharedQueueMessage_V2 extends 
CreateSharedQueueMessage {
       buffer.writeBoolean(durable);
       buffer.writeByte(routingType.getType());
       buffer.writeBoolean(requiresResponse);
+      BufferHelper.writeNullableInteger(buffer, maxConsumers);
+      BufferHelper.writeNullableBoolean(buffer, purgeOnNoConsumers);
+      BufferHelper.writeNullableBoolean(buffer, exclusive);
+      BufferHelper.writeNullableBoolean(buffer, lastValue);
    }
 
    @Override
@@ -82,6 +137,12 @@ public class CreateSharedQueueMessage_V2 extends 
CreateSharedQueueMessage {
       durable = buffer.readBoolean();
       routingType = RoutingType.getType(buffer.readByte());
       requiresResponse = buffer.readBoolean();
+      if (buffer.readableBytes() > 0) {
+         maxConsumers = BufferHelper.readNullableInteger(buffer);
+         purgeOnNoConsumers = BufferHelper.readNullableBoolean(buffer);
+         exclusive = BufferHelper.readNullableBoolean(buffer);
+         lastValue = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -94,6 +155,10 @@ public class CreateSharedQueueMessage_V2 extends 
CreateSharedQueueMessage {
       result = prime * result + (durable ? 1231 : 1237);
       result = prime * result + routingType.getType();
       result = prime * result + (requiresResponse ? 1231 : 1237);
+      result = prime * result + (maxConsumers == null ? 0 : 
maxConsumers.hashCode());
+      result = prime * result + (purgeOnNoConsumers == null ? 0 : 
purgeOnNoConsumers ? 1231 : 1237);
+      result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 
1237);
+      result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 
1237);
       return result;
    }
 
@@ -127,6 +192,14 @@ public class CreateSharedQueueMessage_V2 extends 
CreateSharedQueueMessage {
          return false;
       if (requiresResponse != other.requiresResponse)
          return false;
+      if (maxConsumers != other.maxConsumers)
+         return false;
+      if (purgeOnNoConsumers != other.purgeOnNoConsumers)
+         return false;
+      if (exclusive != other.exclusive)
+         return false;
+      if (lastValue != other.lastValue)
+         return false;
       return true;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java
index dc0e958..0d08d99 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryResponseMessage_V4.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.BufferHelper;
 
 public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryResponseMessage_V3 {
 
@@ -27,12 +28,18 @@ public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryRe
 
    private int defaultMaxConsumers;
 
+   private Boolean defaultExclusive;
+
+   private Boolean defaultLastValue;
+
    public SessionBindingQueryResponseMessage_V4(final boolean exists,
                                                 final List<SimpleString> 
queueNames,
                                                 final boolean autoCreateQueues,
                                                 final boolean 
autoCreateAddresses,
                                                 final boolean 
defaultPurgeOnNoConsumers,
-                                                final int defaultMaxConsumers) 
{
+                                                final int defaultMaxConsumers,
+                                                final Boolean defaultExclusive,
+                                                final Boolean 
defaultLastValue) {
       super(SESS_BINDINGQUERY_RESP_V4);
 
       this.exists = exists;
@@ -46,6 +53,10 @@ public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryRe
       this.defaultPurgeOnNoConsumers = defaultPurgeOnNoConsumers;
 
       this.defaultMaxConsumers = defaultMaxConsumers;
+
+      this.defaultExclusive = defaultExclusive;
+
+      this.defaultLastValue = defaultLastValue;
    }
 
    public SessionBindingQueryResponseMessage_V4() {
@@ -60,11 +71,21 @@ public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryRe
       return defaultMaxConsumers;
    }
 
+   public Boolean isDefaultExclusive() {
+      return defaultExclusive;
+   }
+
+   public Boolean isDefaultLastValue() {
+      return defaultLastValue;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       buffer.writeBoolean(defaultPurgeOnNoConsumers);
       buffer.writeInt(defaultMaxConsumers);
+      BufferHelper.writeNullableBoolean(buffer, defaultExclusive);
+      BufferHelper.writeNullableBoolean(buffer, defaultLastValue);
    }
 
    @Override
@@ -72,6 +93,10 @@ public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryRe
       super.decodeRest(buffer);
       defaultPurgeOnNoConsumers = buffer.readBoolean();
       defaultMaxConsumers = buffer.readInt();
+      if (buffer.readableBytes() > 0) {
+         defaultExclusive = BufferHelper.readNullableBoolean(buffer);
+         defaultLastValue = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -80,6 +105,8 @@ public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryRe
       int result = super.hashCode();
       result = prime * result + (defaultPurgeOnNoConsumers ? 1231 : 1237);
       result = prime * result + defaultMaxConsumers;
+      result = prime * result + (defaultExclusive == null ? 0 : 
defaultExclusive ? 1231 : 1237);
+      result = prime * result + (defaultLastValue == null ? 0 : 
defaultLastValue ? 1231 : 1237);
       return result;
    }
 
@@ -95,6 +122,8 @@ public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryRe
       StringBuffer buff = new StringBuffer(super.getParentString());
       buff.append(", defaultPurgeOnNoConsumers=" + defaultPurgeOnNoConsumers);
       buff.append(", defaultMaxConsumers=" + defaultMaxConsumers);
+      buff.append(", defaultExclusive=" + defaultExclusive);
+      buff.append(", defaultLastValue=" + defaultLastValue);
       return buff.toString();
    }
 
@@ -111,6 +140,10 @@ public class SessionBindingQueryResponseMessage_V4 extends 
SessionBindingQueryRe
          return false;
       if (defaultMaxConsumers != other.defaultMaxConsumers)
          return false;
+      if (defaultExclusive != other.defaultExclusive)
+         return false;
+      if (defaultLastValue != other.defaultLastValue)
+         return false;
       return true;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index bde6593..12f2a7b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -22,6 +22,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.core.client.impl.QueueQueryImpl;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.utils.BufferHelper;
 
 public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryResponseMessage_V2 {
 
@@ -33,12 +34,16 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
 
    protected int maxConsumers;
 
+   protected Boolean exclusive;
+
+   protected Boolean lastValue;
+
    public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
-      this(result.getName(), result.getAddress(), result.isDurable(), 
result.isTemporary(), result.getFilterString(), result.getConsumerCount(), 
result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), 
result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), 
result.getMaxConsumers());
+      this(result.getName(), result.getAddress(), result.isDurable(), 
result.isTemporary(), result.getFilterString(), result.getConsumerCount(), 
result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), 
result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), 
result.getMaxConsumers(), result.isExclusive(), result.isLastValue());
    }
 
    public SessionQueueQueryResponseMessage_V3() {
-      this(null, null, false, false, null, 0, 0, false, false, false, false, 
RoutingType.MULTICAST, -1);
+      this(null, null, false, false, null, 0, 0, false, false, false, false, 
RoutingType.MULTICAST, -1, null, null);
    }
 
    private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -53,7 +58,9 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
                                                final boolean autoCreated,
                                                final boolean 
purgeOnNoConsumers,
                                                final RoutingType routingType,
-                                               final int maxConsumers) {
+                                               final int maxConsumers,
+                                               final Boolean exclusive,
+                                               final Boolean lastValue) {
       super(SESS_QUEUEQUERY_RESP_V3);
 
       this.durable = durable;
@@ -81,6 +88,10 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
       this.routingType = routingType;
 
       this.maxConsumers = maxConsumers;
+
+      this.exclusive = exclusive;
+
+      this.lastValue = lastValue;
    }
 
    public boolean isAutoCreated() {
@@ -115,6 +126,22 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
       this.maxConsumers = maxConsumers;
    }
 
+   public Boolean isExclusive() {
+      return exclusive;
+   }
+
+   public void setExclusive(Boolean exclusive) {
+      this.exclusive = exclusive;
+   }
+
+   public Boolean isLastValue() {
+      return lastValue;
+   }
+
+   public void setLastValue(Boolean lastValue) {
+      this.lastValue = lastValue;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -122,6 +149,8 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
       buffer.writeBoolean(purgeOnNoConsumers);
       buffer.writeByte(routingType.getType());
       buffer.writeInt(maxConsumers);
+      BufferHelper.writeNullableBoolean(buffer, exclusive);
+      BufferHelper.writeNullableBoolean(buffer, lastValue);
    }
 
    @Override
@@ -131,6 +160,10 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
       purgeOnNoConsumers = buffer.readBoolean();
       routingType = RoutingType.getType(buffer.readByte());
       maxConsumers = buffer.readInt();
+      if (buffer.readableBytes() > 0) {
+         exclusive = BufferHelper.readNullableBoolean(buffer);
+         lastValue = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -141,6 +174,8 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
       result = prime * result + (purgeOnNoConsumers ? 1231 : 1237);
       result = prime * result + routingType.hashCode();
       result = prime * result + maxConsumers;
+      result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 
1237);
+      result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 
1237);
       return result;
    }
 
@@ -158,12 +193,14 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
       buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
       buff.append(", routingType=" + routingType);
       buff.append(", maxConsumers=" + maxConsumers);
+      buff.append(", exclusive=" + exclusive);
+      buff.append(", lastValue=" + lastValue);
       return buff.toString();
    }
 
    @Override
    public ClientSession.QueueQuery toQueueQuery() {
-      return new QueueQueryImpl(isDurable(), isTemporary(), 
getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), 
getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), 
isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType());
+      return new QueueQueryImpl(isDurable(), isTemporary(), 
getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), 
getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), 
isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), 
isLastValue());
    }
 
    @Override
@@ -179,6 +216,10 @@ public class SessionQueueQueryResponseMessage_V3 extends 
SessionQueueQueryRespon
          return false;
       if (purgeOnNoConsumers != other.purgeOnNoConsumers)
          return false;
+      if (exclusive != other.exclusive)
+         return false;
+      if (lastValue != other.lastValue)
+         return false;
       if (routingType == null) {
          if (other.routingType != null)
             return false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index cf88d62..6497e3f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -47,6 +47,10 @@ public class QueueQueryResult {
 
    private int maxConsumers;
 
+   private Boolean exclusive;
+
+   private Boolean lastValue;
+
    public QueueQueryResult(final SimpleString name,
                            final SimpleString address,
                            final boolean durable,
@@ -59,7 +63,9 @@ public class QueueQueryResult {
                            final boolean autoCreated,
                            final boolean purgeOnNoConsumers,
                            final RoutingType routingType,
-                           final int maxConsumers) {
+                           final int maxConsumers,
+                           final Boolean exclusive,
+                           final Boolean lastValue) {
       this.durable = durable;
 
       this.temporary = temporary;
@@ -85,6 +91,10 @@ public class QueueQueryResult {
       this.routingType = routingType;
 
       this.maxConsumers = maxConsumers;
+
+      this.exclusive = exclusive;
+
+      this.lastValue = lastValue;
    }
 
    public boolean isExists() {
@@ -142,4 +152,12 @@ public class QueueQueryResult {
    public void setAddress(SimpleString address) {
       this.address = address;
    }
+
+   public Boolean isExclusive() {
+      return exclusive;
+   }
+
+   public Boolean isLastValue() {
+      return lastValue;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 1c85f08..058e606 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -171,12 +171,24 @@ public abstract class SessionContext {
     * @param routingType
     * @param filterString
     * @param durable
+    * @param exclusive
+    * @param lastValue
     * @throws ActiveMQException
     */
    public abstract void createSharedQueue(SimpleString address,
                                           SimpleString queueName,
                                           RoutingType routingType,
                                           SimpleString filterString,
+                                          boolean durable,
+                                          Integer maxConsumers,
+                                          Boolean purgeOnNoConsumers,
+                                          Boolean exclusive,
+                                          Boolean lastValue) throws 
ActiveMQException;
+
+   public abstract void createSharedQueue(SimpleString address,
+                                          SimpleString queueName,
+                                          RoutingType routingType,
+                                          SimpleString filterString,
                                           boolean durable) throws 
ActiveMQException;
 
    public abstract void createSharedQueue(SimpleString address,
@@ -199,6 +211,7 @@ public abstract class SessionContext {
                                     boolean temp,
                                     boolean autoCreated) throws 
ActiveMQException;
 
+   @Deprecated
    public abstract void createQueue(SimpleString address,
                                     RoutingType routingType,
                                     SimpleString queueName,
@@ -209,6 +222,18 @@ public abstract class SessionContext {
                                     boolean purgeOnNoConsumers,
                                     boolean autoCreated) throws 
ActiveMQException;
 
+   public abstract void createQueue(SimpleString address,
+                                    RoutingType routingType,
+                                    SimpleString queueName,
+                                    SimpleString filterString,
+                                    boolean durable,
+                                    boolean temp,
+                                    int maxConsumers,
+                                    boolean purgeOnNoConsumers,
+                                    boolean autoCreated,
+                                    Boolean exclusive,
+                                    Boolean lastVale) throws ActiveMQException;
+
    public abstract ClientSession.QueueQuery queueQuery(SimpleString queueName) 
throws ActiveMQException;
 
    public abstract void forceDelivery(ClientConsumer consumer, long sequence) 
throws ActiveMQException;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index a5ab245..c5e4884 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -24,8 +24,10 @@ import java.util.Properties;
 import java.util.UUID;
 
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.QueueAttributes;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.jndi.JNDIStorable;
+import org.apache.activemq.artemis.api.core.ParameterisedAddress;
 
 /**
  * ActiveMQ Artemis implementation of a JMS Destination.
@@ -250,6 +252,11 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
    private SimpleString simpleAddress;
 
    /**
+    * Queue parameters;
+    */
+   private QueueAttributes queueAttributes;
+
+   /**
     * Needed for serialization backwards compatibility.
     */
    @Deprecated
@@ -280,7 +287,10 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
    protected ActiveMQDestination(final SimpleString address,
                                  final TYPE type,
                                  final ActiveMQSession session) {
-      this.simpleAddress = address;
+
+      if (address != null) {
+         setSimpleAddress(address);
+      }
 
       this.thetype = type;
 
@@ -319,8 +329,16 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
       if (address == null) {
          throw new IllegalArgumentException("address cannot be null");
       }
-      this.address = address.toString();
-      this.simpleAddress = address;
+      if (ParameterisedAddress.isParameterised(address)) {
+         ParameterisedAddress parameteredAddress = new 
ParameterisedAddress(address);
+         this.simpleAddress = parameteredAddress.getAddress();
+         this.address = parameteredAddress.getAddress().toString();
+         this.queueAttributes = parameteredAddress.getQueueAttributes();
+      } else {
+         this.simpleAddress = address;
+         this.address = address.toString();
+         this.queueAttributes = null;
+      }
    }
 
    public void delete() throws JMSException {
@@ -351,6 +369,10 @@ public class ActiveMQDestination extends JNDIStorable 
implements Destination, Se
       return simpleAddress;
    }
 
+   public QueueAttributes getQueueAttributes() {
+      return queueAttributes;
+   }
+
    public String getName() {
       return name != null ? name : getAddress();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 9f86e49..ae1d270 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -37,6 +37,7 @@ import javax.jms.TopicPublisher;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.QueueAttributes;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -412,7 +413,7 @@ public class ActiveMQMessageProducer implements 
MessageProducer, QueueSender, To
                         // TODO is it right to use the address for the queue 
name here?
                         clientSession.createTemporaryQueue(address, 
RoutingType.ANYCAST, address);
                      } else {
-                        clientSession.createQueue(address, 
RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), 
query.isDefaultPurgeOnNoConsumers());
+                        createQueue(destination, RoutingType.ANYCAST, address, 
null, true, true, query.getDefaultMaxConsumers(), 
query.isDefaultPurgeOnNoConsumers(), query.isDefaultExclusive(), 
query.isDefaultLastValueQueue());
                      }
                   } else if (!destination.isQueue() && 
query.isAutoCreateAddresses()) {
                      clientSession.createAddress(address, 
RoutingType.MULTICAST, true);
@@ -427,7 +428,7 @@ public class ActiveMQMessageProducer implements 
MessageProducer, QueueSender, To
                      if (destination.isTemporary()) {
                         clientSession.createTemporaryQueue(address, 
RoutingType.ANYCAST, address);
                      } else {
-                        clientSession.createQueue(address, 
RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), 
query.isDefaultPurgeOnNoConsumers());
+                        createQueue(destination, RoutingType.ANYCAST, address, 
null, true, true, query.getDefaultMaxConsumers(), 
query.isDefaultPurgeOnNoConsumers(), query.isDefaultExclusive(), 
query.isDefaultLastValueQueue());
                      }
                   }
                }
@@ -535,6 +536,27 @@ public class ActiveMQMessageProducer implements 
MessageProducer, QueueSender, To
       }
    }
 
+   private void createQueue(ActiveMQDestination destination, RoutingType 
routingType, SimpleString queueName, SimpleString filter, boolean durable, 
boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, Boolean 
exclusive, Boolean lastValue) throws ActiveMQException {
+      QueueAttributes queueAttributes = destination.getQueueAttributes();
+      if (queueAttributes == null) {
+         clientSession.createQueue(destination.getSimpleAddress(), 
routingType, queueName, filter, durable, autoCreated, maxConsumers, 
purgeOnNoConsumers, exclusive, lastValue);
+      } else {
+         clientSession.createQueue(
+            destination.getSimpleAddress(),
+            routingType,
+            queueName,
+            filter,
+            durable,
+            autoCreated,
+            queueAttributes.getMaxConsumers() == null ? maxConsumers : 
queueAttributes.getMaxConsumers(),
+            queueAttributes.getPurgeOnNoConsumers() == null ? 
purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
+            queueAttributes.getExclusive() == null ? exclusive : 
queueAttributes.getExclusive(),
+            queueAttributes.getLastValue() == null ? lastValue : 
queueAttributes.getLastValue()
+         );
+      }
+   }
+
+
    private static final class CompletionListenerWrapper implements 
SendAcknowledgementHandler {
 
       private final CompletionListener completionListener;

Reply via email to