http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java
index 6f56d00..8b23cea 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/Interceptor.java
@@ -20,7 +20,7 @@ import org.hornetq.spi.core.protocol.RemotingConnection;
  * <p>
  * To add an interceptor to HornetQ server, you have to modify the server 
configuration file
  * {@literal hornetq-configuration.xml}.<br>
- * To add it to a client, use {@link 
ServerLocator#addIncomingInterceptor(Interceptor)}
+ * To add it to a client, use {@link 
org.hornetq.api.core.client.ServerLocator#addIncomingInterceptor(Interceptor)}
  *
  * @author [email protected]
  * @author <a href="mailto:[email protected]";>Tim Fox</a>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java
index 208c7b7..6c55c38 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/JGroupsBroadcastGroupConfiguration.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.jgroups.JChannel;
 import org.jgroups.Message;
 import org.jgroups.ReceiverAdapter;
+import org.jgroups.conf.PlainConfigurator;
 
 /**
  * The configuration for creating broadcasting/discovery groups using JGroups 
channels
@@ -55,6 +56,8 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
    {
       factory = new BroadcastEndpointFactory()
       {
+         private static final long serialVersionUID = 1047956472941098435L;
+
          @Override
          public BroadcastEndpoint createBroadcastEndpoint() throws Exception
          {
@@ -69,6 +72,8 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
    {
       factory = new BroadcastEndpointFactory()
       {
+         private static final long serialVersionUID = 5110372849181145377L;
+
          @Override
          public BroadcastEndpoint createBroadcastEndpoint() throws Exception
          {
@@ -182,12 +187,23 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
          broadcastOpened = true;
       }
 
-      private void initChannel(final String fileName, final String 
channelName) throws Exception
+      private void initChannel(final String jgroupsConfig, final String 
channelName) throws Exception
       {
-         URL configURL = 
Thread.currentThread().getContextClassLoader().getResource(fileName);
+         PlainConfigurator configurator = new PlainConfigurator(jgroupsConfig);
+         try
+         {
+            this.channel = JChannelManager.getJChannel(channelName, 
configurator);
+            return;
+         }
+         catch (Exception e)
+         {
+            this.channel = null;
+         }
+         URL configURL = 
Thread.currentThread().getContextClassLoader().getResource(jgroupsConfig);
+
          if (configURL == null)
          {
-            throw new RuntimeException("couldn't find JGroups configuration " 
+ fileName);
+            throw new RuntimeException("couldn't find JGroups configuration " 
+ jgroupsConfig);
          }
          this.channel = JChannelManager.getJChannel(channelName, configURL);
       }
@@ -242,45 +258,6 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
       }
 
       /**
-       * This is used for identifying a unique JChannel instance.
-       * Because we have two ways to get a JChannel (by configuration
-       * or by passing in a JChannel instance), the key needs to take
-       * this into consideration when doing comparison.
-       *
-       * @param <T> : either being a JChannel or a URL representing the JGroups
-       *            configuration file.
-       */
-      private static class ChannelKey<T>
-      {
-         private final String name;
-         private final T channelSource;
-
-         public ChannelKey(String name, T t)
-         {
-            this.name = name;
-            this.channelSource = t;
-         }
-
-         @Override
-         public int hashCode()
-         {
-            return name.hashCode();
-         }
-
-         @Override
-         public boolean equals(Object t)
-         {
-            if (t == null || (!(t instanceof ChannelKey)))
-            {
-               return false;
-            }
-
-            ChannelKey<?> key = (ChannelKey<?>) t;
-            return (name.equals(key.name) && 
channelSource.equals(key.channelSource));
-         }
-      }
-
-      /**
        * This class wraps a JChannel with a reference counter. The reference 
counter
        * controls the life of the JChannel. When reference count is zero, the 
channel
        * will be disconnected.
@@ -292,7 +269,6 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
          int refCount = 1;
          JChannel channel;
          String channelName;
-         T source;
          List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>();
 
          public JChannelWrapper(String channelName, T t) throws Exception
@@ -307,11 +283,14 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
             {
                this.channel = (JChannel) t;
             }
+            else if (t instanceof PlainConfigurator)
+            {
+               this.channel = new JChannel((PlainConfigurator)t);
+            }
             else
             {
                throw new IllegalArgumentException("Unsupported type " + t);
             }
-            this.source = t;
          }
 
          public synchronized void close()
@@ -319,7 +298,7 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
             refCount--;
             if (refCount == 0)
             {
-               JChannelManager.closeChannel(new 
ChannelKey<T>(this.channelName, source), this.channelName, channel);
+               JChannelManager.closeChannel(this.channelName, channel);
             }
          }
 
@@ -387,31 +366,30 @@ public final class JGroupsBroadcastGroupConfiguration 
implements BroadcastEndpoi
        */
       private static class JChannelManager
       {
-         private static Map<ChannelKey<?>, JChannelWrapper<?>> channels;
+         private static Map<String, JChannelWrapper<?>> channels;
 
          public static synchronized <T> JChannelWrapper<?> getJChannel(String 
channelName, T t) throws Exception
          {
             if (channels == null)
             {
-               channels = new HashMap<ChannelKey<?>, JChannelWrapper<?>>();
+               channels = new HashMap<String, JChannelWrapper<?>>();
             }
-            ChannelKey<T> key = new ChannelKey<T>(channelName, t);
-            JChannelWrapper<?> wrapper = channels.get(key);
+            JChannelWrapper<?> wrapper = channels.get(channelName);
             if (wrapper == null)
             {
                wrapper = new JChannelWrapper<T>(channelName, t);
-               channels.put(key, wrapper);
+               channels.put(channelName, wrapper);
                return wrapper;
             }
             return wrapper.addRef();
          }
 
-         public static synchronized void closeChannel(ChannelKey<?> key, 
String channelName, JChannel channel)
+         public static synchronized void closeChannel(String channelName, 
JChannel channel)
          {
             channel.setReceiver(null);
             channel.disconnect();
             channel.close();
-            JChannelWrapper<?> wrapper = channels.remove(key);
+            JChannelWrapper<?> wrapper = channels.remove(channelName);
             if (wrapper == null)
             {
                throw new IllegalStateException("Did not find channel " + 
channelName);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java
index 52867f8..2f93e25 100644
--- a/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java
+++ b/hornetq-core-client/src/main/java/org/hornetq/api/core/Message.java
@@ -44,7 +44,7 @@ import org.hornetq.utils.UUID;
  * </pre>
  * <p>
  * If conversion is not allowed (for example calling {@code getFloatProperty} 
on a property set a
- * {@code boolean}), a {@link PropertyConversionException} will be thrown.
+ * {@code boolean}), a {@link HornetQPropertyConversionException} will be 
thrown.
  *
  * @author <a href="mailto:[email protected]";>Tim Fox</a>
  * @author <a href="mailto:[email protected]";>ClebertSuconic</a>
@@ -104,7 +104,7 @@ public interface Message
     *
     * @param userID
     */
-   void setUserID(UUID userID);
+   Message setUserID(UUID userID);
 
    /**
     * Returns the address this message is sent to.
@@ -135,7 +135,7 @@ public interface Message
     *
     * @param durable {@code true} to flag this message as durable, {@code 
false} else
     */
-   void setDurable(boolean durable);
+   Message setDurable(boolean durable);
 
    /**
     * Returns the expiration time of this message.
@@ -152,7 +152,7 @@ public interface Message
     *
     * @param expiration expiration time
     */
-   void setExpiration(long expiration);
+   Message setExpiration(long expiration);
 
    /**
     * Returns the message timestamp.
@@ -167,7 +167,7 @@ public interface Message
     *
     * @param timestamp timestamp
     */
-   void setTimestamp(long timestamp);
+   Message setTimestamp(long timestamp);
 
    /**
     * Returns the message priority.
@@ -183,7 +183,7 @@ public interface Message
     *
     * @param priority the new message priority
     */
-   void setPriority(byte priority);
+   Message setPriority(byte priority);
 
    /**
     * Returns the size of the <em>encoded</em> message.
@@ -201,6 +201,16 @@ public interface Message
    HornetQBuffer getBodyBuffer();
 
    /**
+    * Writes the input byte array to the message body HornetQBuffer
+    */
+   Message writeBodyBufferBytes(byte[] bytes);
+
+   /**
+    * Writes the input String to the message body HornetQBuffer
+    */
+   Message writeBodyBufferString(String string);
+
+   /**
     * Returns a <em>copy</em> of the message body as a HornetQBuffer. Any 
modification
     * of this buffer should not impact the underlying buffer.
     */

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java
index 86d2c4f..2f06fb8 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfiguration.java
@@ -17,6 +17,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.hornetq.core.client.HornetQClientMessageBundle;
+import org.hornetq.core.remoting.impl.TransportConfigurationUtil;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -81,6 +82,7 @@ public class TransportConfiguration implements Serializable
     */
    public TransportConfiguration()
    {
+      this.params = new HashMap<>();
    }
 
    /**
@@ -95,7 +97,14 @@ public class TransportConfiguration implements Serializable
    {
       factoryClassName = className;
 
-      this.params = params;
+      if (params == null || params.isEmpty())
+      {
+         this.params = TransportConfigurationUtil.getDefaults(className);
+      }
+      else
+      {
+         this.params = params;
+      }
 
       this.name = name;
    }
@@ -402,4 +411,4 @@ public class TransportConfiguration implements Serializable
    {
       return str.replace('.', '-');
    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java
new file mode 100644
index 0000000..9a9565b
--- /dev/null
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/TransportConfigurationHelper.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.core;
+
+import java.util.Map;
+
+/**
+ * Helper interface for specifying default parameters on Transport 
Configurations.
+ *
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
+ */
+public interface TransportConfigurationHelper
+{
+   Map<String, Object> getDefaults();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java
index 8ddd627..73deb9a 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/UDPBroadcastGroupConfiguration.java
@@ -20,7 +20,6 @@ import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.MulticastSocket;
-import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.client.HornetQClientLogger;
@@ -37,23 +36,16 @@ public final class UDPBroadcastGroupConfiguration 
implements BroadcastEndpointFa
 {
    private static final long serialVersionUID = 1052413739064253955L;
 
-   private final transient String localBindAddress;
+   private transient String localBindAddress = null;
 
-   private final transient int localBindPort;
+   private transient int localBindPort = -1;
 
-   private final String groupAddress;
+   private String groupAddress = null;
 
-   private final int groupPort;
+   private int groupPort = -1;
 
-   public UDPBroadcastGroupConfiguration(final String groupAddress,
-                                         final int groupPort,
-                                         final String localBindAddress,
-                                         final int localBindPort)
+   public UDPBroadcastGroupConfiguration()
    {
-      this.groupAddress = groupAddress;
-      this.groupPort = groupPort;
-      this.localBindAddress = localBindAddress;
-      this.localBindPort = localBindPort;
    }
 
    public BroadcastEndpointFactory createBroadcastEndpointFactory()
@@ -63,10 +55,11 @@ public final class UDPBroadcastGroupConfiguration 
implements BroadcastEndpointFa
          @Override
          public BroadcastEndpoint createBroadcastEndpoint() throws Exception
          {
-            return new UDPBroadcastEndpoint(groupAddress != null ? 
InetAddress.getByName(groupAddress) : null,
-                                            groupPort,
-                                            localBindAddress != null ? 
InetAddress.getByName(localBindAddress) : null,
-                                            localBindPort);
+            return new UDPBroadcastEndpoint()
+               .setGroupAddress(groupAddress != null ? 
InetAddress.getByName(groupAddress) : null)
+               .setGroupPort(groupPort)
+               .setLocalBindAddress(localBindAddress != null ? 
InetAddress.getByName(localBindAddress) : null)
+               .setLocalBindPort(localBindPort);
          }
       };
    }
@@ -76,21 +69,45 @@ public final class UDPBroadcastGroupConfiguration 
implements BroadcastEndpointFa
       return groupAddress;
    }
 
+   public UDPBroadcastGroupConfiguration setGroupAddress(String groupAddress)
+   {
+      this.groupAddress = groupAddress;
+      return this;
+   }
+
    public int getGroupPort()
    {
       return groupPort;
    }
 
+   public UDPBroadcastGroupConfiguration setGroupPort(int groupPort)
+   {
+      this.groupPort = groupPort;
+      return this;
+   }
+
    public int getLocalBindPort()
    {
       return localBindPort;
    }
 
+   public UDPBroadcastGroupConfiguration setLocalBindPort(int localBindPort)
+   {
+      this.localBindPort = localBindPort;
+      return this;
+   }
+
    public String getLocalBindAddress()
    {
       return localBindAddress;
    }
 
+   public UDPBroadcastGroupConfiguration setLocalBindAddress(String 
localBindAddress)
+   {
+      this.localBindAddress = localBindAddress;
+      return this;
+   }
+
    /**
     * <p> This is the member discovery implementation using direct UDP. It was 
extracted as a refactoring from
     * {@link org.hornetq.core.cluster.DiscoveryGroup}</p>
@@ -103,13 +120,13 @@ public final class UDPBroadcastGroupConfiguration 
implements BroadcastEndpointFa
    {
       private static final int SOCKET_TIMEOUT = 500;
 
-      private final InetAddress localAddress;
+      private InetAddress localAddress;
 
-      private final int localBindPort;
+      private int localBindPort;
 
-      private final InetAddress groupAddress;
+      private InetAddress groupAddress;
 
-      private final int groupPort;
+      private int groupPort;
 
       private DatagramSocket broadcastingSocket;
 
@@ -117,15 +134,32 @@ public final class UDPBroadcastGroupConfiguration 
implements BroadcastEndpointFa
 
       private volatile boolean open;
 
-      public UDPBroadcastEndpoint(final InetAddress groupAddress,
-                                  final int groupPort,
-                                  final InetAddress localBindAddress,
-                                  final int localBindPort) throws 
UnknownHostException
+      public UDPBroadcastEndpoint()
+      {
+      }
+
+      public UDPBroadcastEndpoint setGroupAddress(InetAddress groupAddress)
       {
          this.groupAddress = groupAddress;
+         return this;
+      }
+
+      public UDPBroadcastEndpoint setGroupPort(int groupPort)
+      {
          this.groupPort = groupPort;
-         this.localAddress = localBindAddress;
+         return this;
+      }
+
+      public UDPBroadcastEndpoint setLocalBindAddress(InetAddress localAddress)
+      {
+         this.localAddress = localAddress;
+         return this;
+      }
+
+      public UDPBroadcastEndpoint setLocalBindPort(int localBindPort)
+      {
          this.localBindPort = localBindPort;
+         return this;
       }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java
index 3f60863..06e3f56 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientConsumer.java
@@ -13,6 +13,7 @@
 package org.hornetq.api.core.client;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.spi.core.remoting.ConsumerContext;
 
 /**
  * A ClientConsumer receives messages from HornetQ queues.
@@ -38,7 +39,7 @@ public interface ClientConsumer extends AutoCloseable
     * HornetQ implements this as a long but this could be protocol dependent.
     * @return
     */
-   Object getId();
+   ConsumerContext getConsumerContext();
 
    /**
     * Receives a message from a queue.
@@ -95,7 +96,7 @@ public interface ClientConsumer extends AutoCloseable
     * @param handler a MessageHandler
     * @throws HornetQException if an exception occurs while setting the 
MessageHandler
     */
-   void setMessageHandler(MessageHandler handler) throws HornetQException;
+   ClientConsumer setMessageHandler(MessageHandler handler) throws 
HornetQException;
 
    /**
     * Closes the consumer.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java
index 7903e0e..1a23af3 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientMessage.java
@@ -17,6 +17,7 @@ import java.io.OutputStream;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
 
 /**
  *
@@ -38,8 +39,9 @@ public interface ClientMessage extends Message
     * <p>
     * This method is not meant to be called by HornetQ clients.
     * @param deliveryCount message delivery count
+    * @return this ClientMessage
     */
-   void setDeliveryCount(int deliveryCount);
+   ClientMessage setDeliveryCount(int deliveryCount);
 
    /**
     * Acknowledges reception of this message.
@@ -50,7 +52,7 @@ public interface ClientMessage extends Message
     * @throws HornetQException if an error occurred while acknowledging the 
message.
     * @see ClientSession#isAutoCommitAcks()
     */
-   void acknowledge() throws HornetQException;
+   ClientMessage acknowledge() throws HornetQException;
 
    /**
     * Acknowledges reception of a single message.
@@ -61,7 +63,7 @@ public interface ClientMessage extends Message
     * @throws HornetQException if an error occurred while acknowledging the 
message.
     * @see ClientSession#isAutoCommitAcks()
     */
-   void individualAcknowledge() throws HornetQException;
+   ClientMessage individualAcknowledge() throws HornetQException;
 
    /**
     * This can be optionally used to verify if the entire message has been 
received.
@@ -84,8 +86,9 @@ public interface ClientMessage extends Message
     * This method is used when consuming large messages
     *
     * @throws HornetQException
+    * @return this ClientMessage
     */
-   void setOutputStream(OutputStream out) throws HornetQException;
+   ClientMessage setOutputStream(OutputStream out) throws HornetQException;
 
    /**
     * Saves the content of the message to the OutputStream.
@@ -111,7 +114,119 @@ public interface ClientMessage extends Message
     * Sets the body's IntputStream.
     * <br>
     * This method is used when sending large messages
+    * @return this ClientMessage
     */
-   void setBodyInputStream(InputStream bodyInputStream);
+   ClientMessage setBodyInputStream(InputStream bodyInputStream);
+
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putBooleanProperty(SimpleString key, boolean value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putBooleanProperty(String key, boolean value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putByteProperty(SimpleString key, byte value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putByteProperty(String key, byte value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putBytesProperty(SimpleString key, byte[] value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putBytesProperty(String key, byte[] value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putShortProperty(SimpleString key, short value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putShortProperty(String key, short value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putCharProperty(SimpleString key, char value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putCharProperty(String key, char value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putIntProperty(SimpleString key, int value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putIntProperty(String key, int value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putLongProperty(SimpleString key, long value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putLongProperty(String key, long value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putFloatProperty(SimpleString key, float value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putFloatProperty(String key, float value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putDoubleProperty(SimpleString key, double value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putDoubleProperty(String key, double value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putStringProperty(SimpleString key, SimpleString value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage putStringProperty(String key, String value);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage writeBodyBufferBytes(byte[] bytes);
+
+   /**
+    * Overridden from {@link org.hornetq.api.core.Message} to enable fluent API
+    */
+   ClientMessage writeBodyBufferString(String string);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java
index cb56d0a..b55c9e1 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSession.java
@@ -113,7 +113,7 @@ public interface ClientSession extends XAResource, 
AutoCloseable
     *
     * @throws HornetQException if an exception occurs while starting the 
session
     */
-   void start() throws HornetQException;
+   ClientSession start() throws HornetQException;
 
    /**
     * Stops the session.
@@ -642,8 +642,9 @@ public interface ClientSession extends XAResource, 
AutoCloseable
     * Sets a <code>SendAcknowledgementHandler</code> for this session.
     *
     * @param handler a SendAcknowledgementHandler
+    * @return this ClientSession
     */
-   void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
+   ClientSession setSendAcknowledgementHandler(SendAcknowledgementHandler 
handler);
 
    /**
     * Attach any metadata to the session.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java
index f6936b5..4c2d98c 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ClientSessionFactory.java
@@ -144,8 +144,9 @@ public interface ClientSessionFactory extends AutoCloseable
     * Adds a FailoverEventListener to the session which is notified if a 
failover event  occurs on the session.
     *
     * @param listener the listener to add
+    * @return this ClientSessionFactory
     */
-   void addFailoverListener(FailoverEventListener listener);
+   ClientSessionFactory addFailoverListener(FailoverEventListener listener);
 
    /**
     * Removes a FailoverEventListener to the session.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java
index 01a830a..eb2efa8 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/client/ServerLocator.java
@@ -16,6 +16,7 @@ import org.hornetq.api.core.DiscoveryGroupConfiguration;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.client.impl.Topology;
+import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory;
 
 /**
  * The serverLocator locates a server, but beyond that it locates a server 
based on a list.
@@ -24,7 +25,7 @@ import org.hornetq.core.client.impl.Topology;
  * HA, the locator will always get an updated list of members to the server, 
the server will send
  * the updated list to the client.
  * <p>
- * If you use UDP or JGroups (exclusively JGropus or UDP), the initial 
discovery is done by the
+ * If you use UDP or JGroups (exclusively JGroups or UDP), the initial 
discovery is done by the
  * grouping finder, after the initial connection is made the server will 
always send updates to the
  * client. But the listeners will listen for updates on grouping.
  *
@@ -114,8 +115,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be -1 (to disable) or greater than 0.
     *
     * @param clientFailureCheckPeriod the period to check failure
+    * @return this ServerLocator
     */
-   void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
+   ServerLocator setClientFailureCheckPeriod(long clientFailureCheckPeriod);
 
    /**
     * When <code>true</code>, consumers created through this factory will 
create temporary files to
@@ -134,8 +136,9 @@ public interface ServerLocator extends AutoCloseable
     * Sets whether large messages received by consumers created through this 
factory will be cached in temporary files or not.
     *
     * @param cached <code>true</code> to cache large messages in temporary 
files, <code>false</code> else
+    * @return this ServerLocator
     */
-   void setCacheLargeMessagesClient(boolean cached);
+   ServerLocator setCacheLargeMessagesClient(boolean cached);
 
    /**
     * Returns the connection <em>time-to-live</em>.
@@ -154,14 +157,15 @@ public interface ServerLocator extends AutoCloseable
     * Value must be -1 (to disable) or greater or equals to 0.
     *
     * @param connectionTTL period in milliseconds
+    * @return this ServerLocator
     */
-   void setConnectionTTL(long connectionTTL);
+   ServerLocator setConnectionTTL(long connectionTTL);
 
    /**
     * Returns the blocking calls timeout.
     * <p>
     * If client's blocking calls to the server take more than this timeout, 
the call will throw a
-    * {@link HornetQException} with the code {@link 
HornetQExceptionType#CONNECTION_TIMEDOUT}. Value
+    * {@link org.hornetq.api.core.HornetQException} with the code {@link 
org.hornetq.api.core.HornetQExceptionType#CONNECTION_TIMEDOUT}. Value
     * is in milliseconds, default value is {@link 
HornetQClient#DEFAULT_CALL_TIMEOUT}.
     *
     * @return the blocking calls timeout
@@ -174,8 +178,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be greater or equals to 0
     *
     * @param callTimeout blocking call timeout in milliseconds
+    * @return this ServerLocator
     */
-   void setCallTimeout(long callTimeout);
+   ServerLocator setCallTimeout(long callTimeout);
 
 
    /**
@@ -197,8 +202,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be greater or equals to -1, -1 means forever
     *
     * @param callFailoverTimeout blocking call timeout in milliseconds
+    * @return this ServerLocator
     */
-   void setCallFailoverTimeout(long callFailoverTimeout);
+   ServerLocator setCallFailoverTimeout(long callFailoverTimeout);
 
    /**
     * Returns the large message size threshold.
@@ -216,8 +222,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be greater than 0.
     *
     * @param minLargeMessageSize large message size threshold in bytes
+    * @return this ServerLocator
     */
-   void setMinLargeMessageSize(int minLargeMessageSize);
+   ServerLocator setMinLargeMessageSize(int minLargeMessageSize);
 
    /**
     * Returns the window size for flow control of the consumers created 
through this factory.
@@ -235,8 +242,9 @@ public interface ServerLocator extends AutoCloseable
     * (to set the maximum size of the buffer)
     *
     * @param consumerWindowSize window size (in bytes) used for consumer flow 
control
+    * @return this ServerLocator
     */
-   void setConsumerWindowSize(int consumerWindowSize);
+   ServerLocator setConsumerWindowSize(int consumerWindowSize);
 
    /**
     * Returns the maximum rate of message consumption for consumers created 
through this factory.
@@ -256,8 +264,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must -1 (to disable) or a positive integer corresponding to the 
maximum desired message consumption rate specified in units of messages per 
second.
     *
     * @param consumerMaxRate maximum rate of message consumption (in messages 
per seconds)
+    * @return this ServerLocator
     */
-   void setConsumerMaxRate(int consumerMaxRate);
+   ServerLocator setConsumerMaxRate(int consumerMaxRate);
 
    /**
     * Returns the size for the confirmation window of clients using this 
factory.
@@ -275,8 +284,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be -1 (to disable the window) or greater than 0.
     *
     * @param confirmationWindowSize size of the confirmation window (in bytes)
+    * @return this ServerLocator
     */
-   void setConfirmationWindowSize(int confirmationWindowSize);
+   ServerLocator setConfirmationWindowSize(int confirmationWindowSize);
 
    /**
     * Returns the window size for flow control of the producers created 
through this factory.
@@ -294,8 +304,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be -1 (to disable flow control) or greater than 0.
     *
     * @param producerWindowSize window size (in bytest) for flow control of 
the producers created through this factory.
+    * @return this ServerLocator
     */
-   void setProducerWindowSize(int producerWindowSize);
+   ServerLocator setProducerWindowSize(int producerWindowSize);
 
    /**
     * Returns the maximum rate of message production for producers created 
through this factory.
@@ -315,8 +326,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must -1 (to disable) or a positive integer corresponding to the 
maximum desired message production rate specified in units of messages per 
second.
     *
     * @param producerMaxRate maximum rate of message production (in messages 
per seconds)
+    * @return this ServerLocator
     */
-   void setProducerMaxRate(int producerMaxRate);
+   ServerLocator setProducerMaxRate(int producerMaxRate);
 
    /**
     * Returns whether consumers created through this factory will block while
@@ -336,8 +348,9 @@ public interface ServerLocator extends AutoCloseable
     * @param blockOnAcknowledge <code>true</code> to block when sending message
     *                           acknowledgments or <code>false</code> to send 
them
     *                           asynchronously
+    * @return this ServerLocator
     */
-   void setBlockOnAcknowledge(boolean blockOnAcknowledge);
+   ServerLocator setBlockOnAcknowledge(boolean blockOnAcknowledge);
 
    /**
     * Returns whether producers created through this factory will block while 
sending <em>durable</em> messages or do it asynchronously.
@@ -355,8 +368,9 @@ public interface ServerLocator extends AutoCloseable
     * Sets whether producers created through this factory will block while 
sending <em>durable</em> messages or do it asynchronously.
     *
     * @param blockOnDurableSend <code>true</code> to block when sending 
durable messages or <code>false</code> to send them asynchronously
+    * @return this ServerLocator
     */
-   void setBlockOnDurableSend(boolean blockOnDurableSend);
+   ServerLocator setBlockOnDurableSend(boolean blockOnDurableSend);
 
    /**
     * Returns whether producers created through this factory will block while 
sending <em>non-durable</em> messages or do it asynchronously.
@@ -374,8 +388,9 @@ public interface ServerLocator extends AutoCloseable
     * Sets whether producers created through this factory will block while 
sending <em>non-durable</em> messages or do it asynchronously.
     *
     * @param blockOnNonDurableSend <code>true</code> to block when sending 
non-durable messages or <code>false</code> to send them asynchronously
+    * @return this ServerLocator
     */
-   void setBlockOnNonDurableSend(boolean blockOnNonDurableSend);
+   ServerLocator setBlockOnNonDurableSend(boolean blockOnNonDurableSend);
 
    /**
     * Returns whether producers created through this factory will automatically
@@ -394,8 +409,9 @@ public interface ServerLocator extends AutoCloseable
     * assign a group ID to the messages they sent.
     *
     * @param autoGroup <code>true</code> to automatically assign a group ID to 
each messages sent through this factory, <code>false</code> else
+    * @return this ServerLocator
     */
-   void setAutoGroup(boolean autoGroup);
+   ServerLocator setAutoGroup(boolean autoGroup);
 
    /**
     * Returns the group ID that will be eventually set on each message for the 
property {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
@@ -410,8 +426,9 @@ public interface ServerLocator extends AutoCloseable
     * Sets the group ID that will be  set on each message sent through this 
factory.
     *
     * @param groupID the group ID to use
+    * @return this ServerLocator
     */
-   void setGroupID(String groupID);
+   ServerLocator setGroupID(String groupID);
 
    /**
     * Returns whether messages will pre-acknowledged on the server before they 
are sent to the consumers or not.
@@ -427,8 +444,9 @@ public interface ServerLocator extends AutoCloseable
     *
     * @param preAcknowledge <code>true</code> to enable pre-acknowledgment,
     *                       <code>false</code> else
+    * @return this ServerLocator
     */
-   void setPreAcknowledge(boolean preAcknowledge);
+   ServerLocator setPreAcknowledge(boolean preAcknowledge);
 
    /**
     * Returns the acknowledgments batch size.
@@ -445,8 +463,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be equal or greater than 0.
     *
     * @param ackBatchSize acknowledgments batch size
+    * @return this ServerLocator
     */
-   void setAckBatchSize(int ackBatchSize);
+   ServerLocator setAckBatchSize(int ackBatchSize);
 
    /**
     * Returns an array of TransportConfigurations representing the static list 
of live servers used
@@ -476,8 +495,9 @@ public interface ServerLocator extends AutoCloseable
     * or its own pools.
     *
     * @param useGlobalPools <code>true</code> to let this factory uses global 
thread pools, <code>false</code> else
+    * @return this ServerLocator
     */
-   void setUseGlobalPools(boolean useGlobalPools);
+   ServerLocator setUseGlobalPools(boolean useGlobalPools);
 
    /**
     * Returns the maximum size of the scheduled thread pool.
@@ -495,8 +515,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be greater than 0.
     *
     * @param scheduledThreadPoolMaxSize maximum size of the scheduled thread 
pool.
+    * @return this ServerLocator
     */
-   void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize);
+   ServerLocator setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize);
 
    /**
     * Returns the maximum size of the thread pool.
@@ -514,8 +535,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be -1 (for unlimited thread pool) or greater than 0.
     *
     * @param threadPoolMaxSize maximum size of the thread pool.
+    * @return this ServerLocator
     */
-   void setThreadPoolMaxSize(int threadPoolMaxSize);
+   ServerLocator setThreadPoolMaxSize(int threadPoolMaxSize);
 
    /**
     * Returns the time to retry connections created by this factory after 
failure.
@@ -532,8 +554,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be greater than 0.
     *
     * @param retryInterval time (in milliseconds) to retry connections created 
by this factory after failure
+    * @return this ServerLocator
     */
-   void setRetryInterval(long retryInterval);
+   ServerLocator setRetryInterval(long retryInterval);
 
    /**
     * Returns the multiplier to apply to successive retry intervals.
@@ -550,8 +573,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be positive.
     *
     * @param retryIntervalMultiplier multiplier to apply to successive retry 
intervals
+    * @return this ServerLocator
     */
-   void setRetryIntervalMultiplier(double retryIntervalMultiplier);
+   ServerLocator setRetryIntervalMultiplier(double retryIntervalMultiplier);
 
    /**
     * Returns the maximum retry interval (in the case a retry interval 
multiplier has been specified).
@@ -569,8 +593,9 @@ public interface ServerLocator extends AutoCloseable
     *
     * @param maxRetryInterval maximum retry interval to apply in the case a 
retry interval multiplier
     *                         has been specified
+    * @return this ServerLocator
     */
-   void setMaxRetryInterval(long maxRetryInterval);
+   ServerLocator setMaxRetryInterval(long maxRetryInterval);
 
    /**
     * Returns the maximum number of attempts to retry connection in case of 
failure.
@@ -587,8 +612,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be -1 (to retry infinitely), 0 (to never retry connection) or 
greater than 0.
     *
     * @param reconnectAttempts maximum number of attempts to retry connection 
in case of failure
+    * @return this ServerLocator
     */
-   void setReconnectAttempts(int reconnectAttempts);
+   ServerLocator setReconnectAttempts(int reconnectAttempts);
 
    /**
     * Sets the maximum number of attempts to establish an initial connection.
@@ -596,8 +622,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be -1 (to retry infinitely), 0 (to never retry connection) or 
greater than 0.
     *
     * @param reconnectAttempts maximum number of attempts for the initial 
connection
+    * @return this ServerLocator
     */
-   void setInitialConnectAttempts(int reconnectAttempts);
+   ServerLocator setInitialConnectAttempts(int reconnectAttempts);
 
    /**
     * @return the number of attempts to be made for first initial connection.
@@ -616,8 +643,9 @@ public interface ServerLocator extends AutoCloseable
     * Sets the value for FailoverOnInitialReconnection
     *
     * @param failover
+    * @return this ServerLocator
     */
-   void setFailoverOnInitialConnection(boolean failover);
+   ServerLocator setFailoverOnInitialConnection(boolean failover);
 
    /**
     * Returns the class name of the connection load balancing policy.
@@ -631,11 +659,12 @@ public interface ServerLocator extends AutoCloseable
    /**
     * Sets the class name of the connection load balancing policy.
     * <p>
-    * Value must be the name of a class implementing {@link 
ConnectionLoadBalancingPolicy}.
+    * Value must be the name of a class implementing {@link 
org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy}.
     *
     * @param loadBalancingPolicyClassName class name of the connection load 
balancing policy
+    * @return this ServerLocator
     */
-   void setConnectionLoadBalancingPolicyClassName(String 
loadBalancingPolicyClassName);
+   ServerLocator setConnectionLoadBalancingPolicyClassName(String 
loadBalancingPolicyClassName);
 
    /**
     * Returns the initial size of messages created through this factory.
@@ -652,8 +681,9 @@ public interface ServerLocator extends AutoCloseable
     * Value must be greater than 0.
     *
     * @param size initial size of messages created through this factory.
+    * @return this ServerLocator
     */
-   void setInitialMessagePacketSize(int size);
+   ServerLocator setInitialMessagePacketSize(int size);
 
    /**
     * Adds an interceptor which will be executed <em>after packets are 
received from the server</em>. Invoking this
@@ -671,15 +701,17 @@ public interface ServerLocator extends AutoCloseable
     * Adds an interceptor which will be executed <em>after packets are 
received from the server</em>.
     *
     * @param interceptor an Interceptor
+    * @return this ServerLocator
     */
-   void addIncomingInterceptor(Interceptor interceptor);
+   ServerLocator addIncomingInterceptor(Interceptor interceptor);
 
    /**
     * Adds an interceptor which will be executed <em>before packets are sent 
to the server</em>.
     *
     * @param interceptor an Interceptor
+    * @return this ServerLocator
     */
-   void addOutgoingInterceptor(Interceptor interceptor);
+   ServerLocator addOutgoingInterceptor(Interceptor interceptor);
 
    /**
     * Removes an interceptor. Invoking this method is the same as invoking
@@ -741,12 +773,18 @@ public interface ServerLocator extends AutoCloseable
     * Sets whether to compress or not large messages.
     *
     * @param compressLargeMessages
+    * @return this ServerLocator
     */
-   void setCompressLargeMessage(boolean compressLargeMessages);
+   ServerLocator setCompressLargeMessage(boolean compressLargeMessages);
 
    // XXX No javadocs
-   void addClusterTopologyListener(ClusterTopologyListener listener);
+   ServerLocator addClusterTopologyListener(ClusterTopologyListener listener);
 
    // XXX No javadocs
    void removeClusterTopologyListener(ClusterTopologyListener listener);
+
+   ClientProtocolManagerFactory getProtocolManagerFactory();
+
+   void setProtocolManagerFactory(ClientProtocolManagerFactory 
protocolManager);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java
index 13cf953..cd06495 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/AddressSettingsInfo.java
@@ -50,6 +50,12 @@ public final class AddressSettingsInfo
 
    private final boolean sendToDLAOnNoRoute;
 
+   private final long slowConsumerThreshold;
+
+   private final long slowConsumerCheckPeriod;
+
+   private final String slowConsumerPolicy;
+
    // Static --------------------------------------------------------
 
    public static AddressSettingsInfo from(final String jsonString) throws 
Exception
@@ -67,7 +73,10 @@ public final class AddressSettingsInfo
                                      object.getString("expiryAddress"),
                                      object.getBoolean("lastValueQueue"),
                                      object.getLong("redistributionDelay"),
-                                     object.getBoolean("sendToDLAOnNoRoute"));
+                                     object.getBoolean("sendToDLAOnNoRoute"),
+                                     object.getLong("slowConsumerThreshold"),
+                                     object.getLong("slowConsumerCheckPeriod"),
+                                     object.getString("slowConsumerPolicy"));
    }
 
    // Constructors --------------------------------------------------
@@ -84,7 +93,10 @@ public final class AddressSettingsInfo
                               String expiryAddress,
                               boolean lastValueQueue,
                               long redistributionDelay,
-                              boolean sendToDLAOnNoRoute)
+                              boolean sendToDLAOnNoRoute,
+                              long slowConsumerThreshold,
+                              long slowConsumerCheckPeriod,
+                              String slowConsumerPolicy)
    {
       this.addressFullMessagePolicy = addressFullMessagePolicy;
       this.maxSizeBytes = maxSizeBytes;
@@ -99,6 +111,9 @@ public final class AddressSettingsInfo
       this.lastValueQueue = lastValueQueue;
       this.redistributionDelay = redistributionDelay;
       this.sendToDLAOnNoRoute = sendToDLAOnNoRoute;
+      this.slowConsumerThreshold = slowConsumerThreshold;
+      this.slowConsumerCheckPeriod = slowConsumerCheckPeriod;
+      this.slowConsumerPolicy = slowConsumerPolicy;
    }
 
    // Public --------------------------------------------------------
@@ -172,5 +187,20 @@ public final class AddressSettingsInfo
    {
       return maxRedeliveryDelay;
    }
+
+   public long getSlowConsumerThreshold()
+   {
+      return slowConsumerThreshold;
+   }
+
+   public long getSlowConsumerCheckPeriod()
+   {
+      return slowConsumerCheckPeriod;
+   }
+
+   public String getSlowConsumerPolicy()
+   {
+      return slowConsumerPolicy;
+   }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java
new file mode 100644
index 0000000..d792a9a
--- /dev/null
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/CoreNotificationType.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.api.core.management;
+
+/**
+ * This enum defines all core notification types
+ *
+ * @author <a href="mailto:[email protected]";>Howard Gao</a>
+ */
+public enum CoreNotificationType implements NotificationType
+{
+   BINDING_ADDED(0),
+   BINDING_REMOVED(1),
+   CONSUMER_CREATED(2),
+   CONSUMER_CLOSED(3),
+   SECURITY_AUTHENTICATION_VIOLATION(6),
+   SECURITY_PERMISSION_VIOLATION(7),
+   DISCOVERY_GROUP_STARTED(8),
+   DISCOVERY_GROUP_STOPPED(9),
+   BROADCAST_GROUP_STARTED(10),
+   BROADCAST_GROUP_STOPPED(11),
+   BRIDGE_STARTED(12),
+   BRIDGE_STOPPED(13),
+   CLUSTER_CONNECTION_STARTED(14),
+   CLUSTER_CONNECTION_STOPPED(15),
+   ACCEPTOR_STARTED(16),
+   ACCEPTOR_STOPPED(17),
+   PROPOSAL(18),
+   PROPOSAL_RESPONSE(19),
+   UNPROPOSAL(20),
+   CONSUMER_SLOW(21);
+
+   private final int value;
+
+   private CoreNotificationType(final int value)
+   {
+      this.value = value;
+   }
+
+   public int getType()
+   {
+      return value;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java
index 0d7deca..ca566f7 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/HornetQServerControl.java
@@ -38,7 +38,7 @@ public interface HornetQServerControl
     * Returns the list of interceptors used by this server. Invoking this 
method is the same as invoking
     * <code>getIncomingInterceptorClassNames().</code>
     *
-    * @see Interceptor
+    * @see org.hornetq.api.core.Interceptor
     * @deprecated As of HornetQ 2.3.0.Final, replaced by
     * {@link #getIncomingInterceptorClassNames()} and
     * {@link #getOutgoingInterceptorClassNames()}
@@ -49,14 +49,14 @@ public interface HornetQServerControl
    /**
     * Returns the list of interceptors used by this server for incoming 
messages.
     *
-    * @see Interceptor
+    * @see org.hornetq.api.core.Interceptor
     */
    String[] getIncomingInterceptorClassNames();
 
    /**
     * Returns the list of interceptors used by this server for outgoing 
messages.
     *
-    * @see Interceptor
+    * @see org.hornetq.api.core.Interceptor
     */
    String[] getOutgoingInterceptorClassNames();
 
@@ -490,6 +490,18 @@ public interface HornetQServerControl
    boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name 
= "ipAddress") String ipAddress) throws Exception;
 
    /**
+    * Closes all the connections of clients connected to this server which 
matches the specified IP address.
+    */
+   @Operation(desc = "Closes all the consumer connections for the given 
HornetQ address", impact = MBeanOperationInfo.INFO)
+   boolean closeConsumerConnectionsForAddress(@Parameter(desc = "a HornetQ 
address", name = "address") String address) throws Exception;
+
+   /**
+    * Closes all the connections of sessions with a matching user name.
+    */
+   @Operation(desc = "Closes all the connections for sessions with the given 
user name", impact = MBeanOperationInfo.INFO)
+   boolean closeConnectionsForUser(@Parameter(desc = "a user name", name = 
"userName") String address) throws Exception;
+
+   /**
     * Lists all the IDs of the connections connected to this server.
     */
    @Operation(desc = "List all the connection IDs", impact = 
MBeanOperationInfo.INFO)
@@ -546,7 +558,10 @@ public interface HornetQServerControl
                            @Parameter(desc = "the maximum redelivery delay", 
name = "maxRedeliveryDelay") long maxRedeliveryDelay,
                            @Parameter(desc = "the redistribution delay", name 
= "redistributionDelay") long redistributionDelay,
                            @Parameter(desc = "do we send to the DLA when there 
is no where to route the message", name = "sendToDLAOnNoRoute") boolean 
sendToDLAOnNoRoute,
-                           @Parameter(desc = "the ploicy to use when the 
address is full", name = "addressFullMessagePolicy") String 
addressFullMessagePolicy) throws Exception;
+                           @Parameter(desc = "the policy to use when the 
address is full", name = "addressFullMessagePolicy") String 
addressFullMessagePolicy,
+                           @Parameter(desc = "when a consumer falls below this 
threshold in terms of messages consumed per second it will be considered 
'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
+                           @Parameter(desc = "how often (in seconds) to check 
for slow consumers", name = "slowConsumerCheckPeriod") long 
slowConsumerCheckPeriod,
+                           @Parameter(desc = "the policy to use when a slow 
consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy) 
throws Exception;
 
    void removeAddressSettings(String addressMatch) throws Exception;
 
@@ -599,5 +614,8 @@ public interface HornetQServerControl
    void forceFailover() throws Exception;
 
    void updateDuplicateIdCache(String address, Object[] ids) throws Exception;
+
+   @Operation(desc = "force the server to stop and to scale down to another 
server", impact = MBeanOperationInfo.UNKNOWN)
+   void scaleDown(@Parameter(name = "name", desc = "The connector to use to 
scale down, if not provided the first appropriate connector will be 
used")String connector) throws Exception;
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java
index ffda2df..bacae4d 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ManagementHelper.java
@@ -75,6 +75,10 @@ public final class ManagementHelper
 
    public static final SimpleString HDR_PROPOSAL_ALT_VALUE = new 
SimpleString("_JBM_ProposalAltValue");
 
+   public static final SimpleString HDR_CONSUMER_NAME = new 
SimpleString("_HQ_ConsumerName");
+
+   public static final SimpleString HDR_CONNECTION_NAME = new 
SimpleString("_HQ_ConnectionName");
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java
index 2ec5dd3..ecabae7 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/NotificationType.java
@@ -24,37 +24,7 @@ package org.hornetq.api.core.management;
  * @see the HornetQ user manual section on "Management Notifications"
  * @author <a href="mailto:[email protected]";>Jeff Mesnil</a>
  */
-public enum NotificationType
+public interface NotificationType
 {
-   BINDING_ADDED(0),
-   BINDING_REMOVED(1),
-   CONSUMER_CREATED(2),
-   CONSUMER_CLOSED(3),
-   SECURITY_AUTHENTICATION_VIOLATION(6),
-   SECURITY_PERMISSION_VIOLATION(7),
-   DISCOVERY_GROUP_STARTED(8),
-   DISCOVERY_GROUP_STOPPED(9),
-   BROADCAST_GROUP_STARTED(10),
-   BROADCAST_GROUP_STOPPED(11),
-   BRIDGE_STARTED(12),
-   BRIDGE_STOPPED(13),
-   CLUSTER_CONNECTION_STARTED(14),
-   CLUSTER_CONNECTION_STOPPED(15),
-   ACCEPTOR_STARTED(16),
-   ACCEPTOR_STOPPED(17),
-   PROPOSAL(18),
-   PROPOSAL_RESPONSE(19),
-   UNPROPOSAL(20);
-
-   private final int value;
-
-   private NotificationType(final int value)
-   {
-      this.value = value;
-   }
-
-   public int intValue()
-   {
-      return value;
-   }
+   int getType();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java
index 0c43dad..4994902 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/ObjectNameBuilder.java
@@ -23,6 +23,7 @@ import org.hornetq.api.core.SimpleString;
  */
 public final class ObjectNameBuilder
 {
+
    // Constants -----------------------------------------------------
 
    /**
@@ -30,14 +31,13 @@ public final class ObjectNameBuilder
     */
    public static final ObjectNameBuilder DEFAULT = new 
ObjectNameBuilder(HornetQDefaultConfiguration.getDefaultJmxDomain());
 
-   static final String DEFAULT_SUBSYSTEM = "messaging";
-   static final String DEFAULT_SERVER = "default";
+   static final String JMS_MODULE = "JMS";
+
+   static final String CORE_MODULE = "Core";
 
    // Attributes ----------------------------------------------------
 
    private final String domain;
-   private final String subsystem;
-   private final String serverName;
 
    // Static --------------------------------------------------------
 
@@ -58,8 +58,6 @@ public final class ObjectNameBuilder
    private ObjectNameBuilder(final String domain)
    {
       this.domain = domain;
-      this.subsystem = DEFAULT_SUBSYSTEM;
-      this.serverName = DEFAULT_SERVER;
    }
 
    // Public --------------------------------------------------------
@@ -69,9 +67,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getHornetQServerObjectName() throws Exception
    {
-      String value = String.format("%s:subsystem=%s,hornetq-server=%s",
-            domain, subsystem, serverName);
-      return ObjectName.getInstance(value);
+      return ObjectName.getInstance(domain + ":module=Core,type=Server");
    }
 
    /**
@@ -81,7 +77,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getAddressObjectName(final SimpleString address) throws 
Exception
    {
-      return createObjectName("core-address", address.toString());
+      return createObjectName(ObjectNameBuilder.CORE_MODULE, "Address", 
address.toString());
    }
 
    /**
@@ -91,11 +87,12 @@ public final class ObjectNameBuilder
     */
    public ObjectName getQueueObjectName(final SimpleString address, final 
SimpleString name) throws Exception
    {
-      ObjectProperty[] props = new ObjectProperty[] {
-         new ObjectProperty("address", address.toString()),
-         new ObjectProperty("runtime-queue", name.toString())
-      };
-      return this.createObjectName(props);
+      return 
ObjectName.getInstance(String.format("%s:module=%s,type=%s,address=%s,name=%s",
+                                                  domain,
+                                                  
ObjectNameBuilder.CORE_MODULE,
+                                                  "Queue",
+                                                  
ObjectName.quote(address.toString()),
+                                                  
ObjectName.quote(name.toString())));
    }
 
    /**
@@ -105,7 +102,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getDivertObjectName(final String name) throws Exception
    {
-      return createObjectName("divert", name.toString());
+      return createObjectName(ObjectNameBuilder.CORE_MODULE, "Divert", 
name.toString());
    }
 
    /**
@@ -115,7 +112,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getAcceptorObjectName(final String name) throws Exception
    {
-      return createObjectName("remote-acceptor", name);
+      return createObjectName(ObjectNameBuilder.CORE_MODULE, "Acceptor", name);
    }
 
    /**
@@ -125,7 +122,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getBroadcastGroupObjectName(final String name) throws 
Exception
    {
-      return createObjectName("broadcast-group", name);
+      return createObjectName(ObjectNameBuilder.CORE_MODULE, "BroadcastGroup", 
name);
    }
 
    /**
@@ -135,7 +132,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getBridgeObjectName(final String name) throws Exception
    {
-      return createObjectName("bridge", name);
+      return createObjectName(ObjectNameBuilder.CORE_MODULE, "Bridge", name);
    }
 
    /**
@@ -145,7 +142,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getClusterConnectionObjectName(final String name) throws 
Exception
    {
-      return createObjectName("cluster-connection", name);
+      return createObjectName(ObjectNameBuilder.CORE_MODULE, 
"ClusterConnection", name);
    }
 
    /**
@@ -155,7 +152,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getDiscoveryGroupObjectName(final String name) throws 
Exception
    {
-      return createObjectName("discovery-group", name);
+      return createObjectName(ObjectNameBuilder.CORE_MODULE, "DiscoveryGroup", 
name);
    }
 
    /**
@@ -164,9 +161,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getJMSServerObjectName() throws Exception
    {
-      String value = String.format("%s:subsystem=%s,hornetq-server=%s",
-            domain, subsystem, serverName);
-      return ObjectName.getInstance(value);
+      return ObjectName.getInstance(domain + ":module=JMS,type=Server");
    }
 
    /**
@@ -175,7 +170,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getJMSQueueObjectName(final String name) throws Exception
    {
-      return createObjectName("jms-queue", name);
+      return createObjectName(ObjectNameBuilder.JMS_MODULE, "Queue", name);
    }
 
    /**
@@ -185,7 +180,7 @@ public final class ObjectNameBuilder
     */
    public ObjectName getJMSTopicObjectName(final String name) throws Exception
    {
-      return createObjectName("jms-topic", name);
+      return createObjectName(ObjectNameBuilder.JMS_MODULE, "Topic", name);
    }
 
    /**
@@ -194,38 +189,15 @@ public final class ObjectNameBuilder
     */
    public ObjectName getConnectionFactoryObjectName(final String name) throws 
Exception
    {
-      return createObjectName("connection-factory", name);
+      return createObjectName(ObjectNameBuilder.JMS_MODULE, 
"ConnectionFactory", name);
    }
 
-   private ObjectName createObjectName(String beanType, String beanName) 
throws Exception
+   private ObjectName createObjectName(final String module, final String type, 
final String name) throws Exception
    {
-      return this.createObjectName(new ObjectProperty[] {new 
ObjectProperty(beanType, beanName)});
-   }
-
-   private ObjectName createObjectName(ObjectProperty[] props) throws Exception
-   {
-      String baseValue = String.format("%s:subsystem=%s,hornetq-server=%s",
-                                   domain, subsystem, serverName);
-      StringBuilder builder = new StringBuilder(baseValue);
-      for (ObjectProperty p : props)
-      {
-         builder.append(",");
-         builder.append(p.key);
-         builder.append("=");
-         builder.append(ObjectName.quote(p.val));
-      }
-      return ObjectName.getInstance(builder.toString());
-   }
-
-   private static class ObjectProperty
-   {
-      String key;
-      String val;
-
-      public ObjectProperty(String k, String v)
-      {
-         this.key = k;
-         this.val = v;
-      }
+      return 
ObjectName.getInstance(String.format("%s:module=%s,type=%s,name=%s",
+                                                  domain,
+                                                  module,
+                                                  type,
+                                                  ObjectName.quote(name)));
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java
 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java
index f2c6895..d63ea15 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/api/core/management/QueueControl.java
@@ -81,6 +81,11 @@ public interface QueueControl
    long getMessagesAdded();
 
    /**
+    * Returns the number of messages added to this queue since it was created.
+    */
+   long getMessagesAcknowledged();
+
+   /**
     * Returns the first message on the queue as JSON
     */
    String getFirstMessageAsJSON() throws Exception;
@@ -356,4 +361,19 @@ public interface QueueControl
     */
    @Operation(desc = "Resets the MessagesAdded property", impact = 
MBeanOperationInfo.ACTION)
    void resetMessagesAdded() throws Exception;
+
+   /**
+    * Resets the MessagesAdded property
+    */
+   @Operation(desc = "Resets the MessagesAcknowledged property", impact = 
MBeanOperationInfo.ACTION)
+   void resetMessagesAcknowledged() throws Exception;
+
+   /**
+    * it will flush one cycle on internal executors, so you would be sure that 
any pending tasks are done before you call
+    * any other measure.
+    * It is useful if you need the exact number of counts on a message
+    * @throws Exception
+    */
+   void flushExecutor();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
 
b/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
index a279096..9eae62e 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
@@ -28,7 +28,17 @@ public final class ResetLimitWrappedHornetQBuffer extends 
ChannelBufferWrapper
 {
    private final int limit;
 
-   private final MessageInternal message;
+   private MessageInternal message;
+
+   /**
+    * We need to turn of notifications of body changes on reset on the server 
side when dealing with AMQP conversions,
+    * for that reason this method will set the message to null here
+    * @param message
+    */
+   public void setMessage(MessageInternal message)
+   {
+      this.message = message;
+   }
 
    public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer 
buffer, final MessageInternal message)
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java
 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java
index a1f053a..ca990e6 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientLogger.java
@@ -408,4 +408,8 @@ public interface HornetQClientLogger extends BasicLogger
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 214023, value = "HTTP Handshake failed, the received accept 
value %s does not match the expected response %s")
    void httpHandshakeFailed(String response, String expectedResponse);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 214024, value = "HTTP upgrade not supported by remote 
acceptor")
+   void httpUpgradeNotSupportedByRemoteAcceptor();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java
 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java
index 77bada8..e426a63 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/HornetQClientMessageBundle.java
@@ -102,7 +102,7 @@ public interface HornetQClientMessageBundle
 
    @Message(id = 119016, value =  "Connection failure detected. Unblocking a 
blocking call that will never get a resp" +
          "onse", format = Message.Format.MESSAGE_FORMAT)
-   HornetQUnBlockedException unblockingACall();
+   HornetQUnBlockedException unblockingACall(@Cause Throwable t);
 
    @Message(id = 119017, value =  "Consumer is closed", format = 
Message.Format.MESSAGE_FORMAT)
    HornetQObjectClosedException consumerClosed();
@@ -241,4 +241,7 @@ public interface HornetQClientMessageBundle
          , format = Message.Format.MESSAGE_FORMAT)
    HornetQLargeMessageInterruptedException largeMessageInterrupted();
 
+   @Message(id = 119061, value =  "error decoding AMQP frame", format = 
Message.Format.MESSAGE_FORMAT)
+   String decodeError();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
index ac719a8..7933245 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
@@ -32,6 +32,7 @@ import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.HornetQClientLogger;
 import org.hornetq.core.client.HornetQClientMessageBundle;
+import org.hornetq.spi.core.remoting.ConsumerContext;
 import org.hornetq.spi.core.remoting.SessionContext;
 import org.hornetq.utils.FutureLatch;
 import org.hornetq.utils.PriorityLinkedList;
@@ -67,7 +68,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
 
    private final SessionContext sessionContext;
 
-   private final long id;
+   private final ConsumerContext consumerContext;
 
    private final SimpleString filterString;
 
@@ -137,7 +138,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
    // 
---------------------------------------------------------------------------------
 
    public ClientConsumerImpl(final ClientSessionInternal session,
-                             final long id,
+                             final ConsumerContext consumerContext,
                              final SimpleString queueName,
                              final SimpleString filterString,
                              final boolean browseOnly,
@@ -150,7 +151,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
                              final ClientSession.QueueQuery queueInfo,
                              final ClassLoader contextClassLoader)
    {
-      this.id = id;
+      this.consumerContext = consumerContext;
 
       this.queueName = queueName;
 
@@ -180,9 +181,9 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
    // ClientConsumer implementation
    // -----------------------------------------------------------------
 
-   public Object getId()
+   public ConsumerContext getConsumerContext()
    {
-      return id;
+      return consumerContext;
    }
 
    private ClientMessage receive(final long timeout, final boolean 
forcingDelivery) throws HornetQException
@@ -421,7 +422,7 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
 
    // Must be synchronized since messages may be arriving while handler is 
being set and might otherwise end
    // up not queueing enough executors - so messages get stranded
-   public synchronized void setMessageHandler(final MessageHandler theHandler) 
throws HornetQException
+   public synchronized ClientConsumerImpl setMessageHandler(final 
MessageHandler theHandler) throws HornetQException
    {
       checkClosed();
 
@@ -449,6 +450,8 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
       {
          waitForOnMessageToComplete(true);
       }
+
+      return this;
    }
 
    public void close() throws HornetQException
@@ -457,29 +460,28 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
    }
 
    /**
-    * To be used by MDBs
+    * To be used by MDBs to stop any more handling of messages.
     *
     * @throws HornetQException
+    * @param future the future to run once the onMessage Thread has completed
     */
-   public void interruptHandlers() throws HornetQException
+   public Thread prepareForClose(final FutureLatch future) throws 
HornetQException
    {
       closing = true;
 
       resetLargeMessageController();
 
-      Thread onThread = onMessageThread;
-      if (onThread != null)
+      //execute the future after the last onMessage call
+      sessionExecutor.execute(new Runnable()
       {
-         try
-         {
-            // just trying to interrupt any ongoing messages
-            onThread.interrupt();
-         }
-         catch (Throwable ignored)
+         @Override
+         public void run()
          {
-            // security exception probably.. we just ignore it, not big deal!
+            future.run();
          }
-      }
+      });
+
+      return onMessageThread;
    }
 
    public void cleanUp()
@@ -558,11 +560,6 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal
       return queueInfo;
    }
 
-   public long getID()
-   {
-      return id;
-   }
-
    public SimpleString getFilterString()
    {
       return filterString;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java
 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java
index c0b4c1e..5b198f5 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientConsumerInternal.java
@@ -17,6 +17,7 @@ import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.utils.FutureLatch;
 
 /**
  * A ClientConsumerInternal
@@ -25,8 +26,6 @@ import org.hornetq.api.core.client.ClientSession;
  */
 public interface ClientConsumerInternal extends ClientConsumer
 {
-   long getID();
-
    SimpleString getQueueName();
 
    SimpleString getFilterString();
@@ -47,8 +46,9 @@ public interface ClientConsumerInternal extends ClientConsumer
     * To be called by things like MDBs during shutdown of the server
     *
     * @throws HornetQException
+    * @param future
     */
-   void interruptHandlers() throws HornetQException;
+   Thread prepareForClose(FutureLatch future) throws HornetQException;
 
    void clearAtFailover();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
index 65adfbc..09a7503 100644
--- 
a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
+++ 
b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientLargeMessageImpl.java
@@ -131,7 +131,7 @@ public final class ClientLargeMessageImpl extends 
ClientMessageImpl implements C
    }
 
    @Override
-   public void setOutputStream(final OutputStream out) throws HornetQException
+   public ClientLargeMessageImpl setOutputStream(final OutputStream out) 
throws HornetQException
    {
       if (bodyBuffer != null)
       {
@@ -141,6 +141,8 @@ public final class ClientLargeMessageImpl extends 
ClientMessageImpl implements C
       {
          largeMessageController.setOutputStream(out);
       }
+
+      return this;
    }
 
    @Override

Reply via email to