http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java
new file mode 100644
index 0000000..30348cb
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.security;
+
+import java.io.Serializable;
+
+/**
+ * A role is used by the security store to define access rights and is 
configured on a connection factory or an address.
+ *
+ * @author <a href="[email protected]">Andy Taylor</a>
+ */
+public class Role implements Serializable
+{
+   private static final long serialVersionUID = 3560097227776448872L;
+
+   private final String name;
+
+   private final boolean send;
+
+   private final boolean consume;
+
+   private final boolean createDurableQueue;
+
+   private final boolean deleteDurableQueue;
+
+   private final boolean createNonDurableQueue;
+
+   private final boolean deleteNonDurableQueue;
+
+   private final boolean manage;
+
+   public Role(final String name,
+               final boolean send,
+               final boolean consume,
+               final boolean createDurableQueue,
+               final boolean deleteDurableQueue,
+               final boolean createNonDurableQueue,
+               final boolean deleteNonDurableQueue,
+               final boolean manage)
+   {
+      if (name == null)
+      {
+         throw new NullPointerException("name is null");
+      }
+      this.name = name;
+      this.send = send;
+      this.consume = consume;
+      this.createDurableQueue = createDurableQueue;
+      this.deleteDurableQueue = deleteDurableQueue;
+      this.createNonDurableQueue = createNonDurableQueue;
+      this.deleteNonDurableQueue = deleteNonDurableQueue;
+      this.manage = manage;
+   }
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public boolean isSend()
+   {
+      return send;
+   }
+
+   public boolean isConsume()
+   {
+      return consume;
+   }
+
+   public boolean isCreateDurableQueue()
+   {
+      return createDurableQueue;
+   }
+
+   public boolean isDeleteDurableQueue()
+   {
+      return deleteDurableQueue;
+   }
+
+   public boolean isCreateNonDurableQueue()
+   {
+      return createNonDurableQueue;
+   }
+
+   public boolean isDeleteNonDurableQueue()
+   {
+      return deleteNonDurableQueue;
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuffer stringReturn = new StringBuffer("Role {name=" + name + "; 
allows=[");
+
+      if (send)
+      {
+         stringReturn.append(" send ");
+      }
+      if (consume)
+      {
+         stringReturn.append(" consume ");
+      }
+      if (createDurableQueue)
+      {
+         stringReturn.append(" createDurableQueue ");
+      }
+      if (deleteDurableQueue)
+      {
+         stringReturn.append(" deleteDurableQueue ");
+      }
+      if (createNonDurableQueue)
+      {
+         stringReturn.append(" createNonDurableQueue ");
+      }
+      if (deleteNonDurableQueue)
+      {
+         stringReturn.append(" deleteNonDurableQueue ");
+      }
+
+      stringReturn.append("]}");
+
+      return stringReturn.toString();
+   }
+
+   @Override
+   public boolean equals(final Object o)
+   {
+      if (this == o)
+      {
+         return true;
+      }
+      if (o == null || getClass() != o.getClass())
+      {
+         return false;
+      }
+
+      Role role = (Role)o;
+
+      if (consume != role.consume)
+      {
+         return false;
+      }
+      if (createDurableQueue != role.createDurableQueue)
+      {
+         return false;
+      }
+      if (createNonDurableQueue != role.createNonDurableQueue)
+      {
+         return false;
+      }
+      if (deleteDurableQueue != role.deleteDurableQueue)
+      {
+         return false;
+      }
+      if (deleteNonDurableQueue != role.deleteNonDurableQueue)
+      {
+         return false;
+      }
+      if (send != role.send)
+      {
+         return false;
+      }
+      if (!name.equals(role.name))
+      {
+         return false;
+      }
+
+      return true;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      int result;
+      result = name.hashCode();
+      result = 31 * result + (send ? 1 : 0);
+      result = 31 * result + (consume ? 1 : 0);
+      result = 31 * result + (createDurableQueue ? 1 : 0);
+      result = 31 * result + (deleteDurableQueue ? 1 : 0);
+      result = 31 * result + (createNonDurableQueue ? 1 : 0);
+      result = 31 * result + (deleteNonDurableQueue ? 1 : 0);
+      return result;
+   }
+
+   public boolean isManage()
+   {
+      return manage;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java
new file mode 100644
index 0000000..9fba0b8
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.server;
+
+import org.apache.activemq6.api.core.SimpleString;
+
+/**
+ *
+ * A QueueQueryResult
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ *
+ */
+public class QueueQueryResult
+{
+   private SimpleString name;
+
+   private boolean exists;
+
+   private boolean durable;
+
+   private int consumerCount;
+
+   private long messageCount;
+
+   private SimpleString filterString;
+
+   private SimpleString address;
+
+   private boolean temporary;
+
+   public QueueQueryResult(final SimpleString name,
+                                           final SimpleString address,
+                                           final boolean durable,
+                                           final boolean temporary,
+                                           final SimpleString filterString,
+                                           final int consumerCount,
+                                           final long messageCount)
+   {
+      this(name, address, durable, temporary, filterString, consumerCount, 
messageCount, true);
+   }
+
+   public QueueQueryResult()
+   {
+      this(null, null, false, false, null, 0, 0, false);
+   }
+
+   private QueueQueryResult(final SimpleString name,
+                                            final SimpleString address,
+                                            final boolean durable,
+                                            final boolean temporary,
+                                            final SimpleString filterString,
+                                            final int consumerCount,
+                                            final long messageCount,
+                                            final boolean exists)
+   {
+      this.durable = durable;
+
+      this.temporary = temporary;
+
+      this.consumerCount = consumerCount;
+
+      this.messageCount = messageCount;
+
+      this.filterString = filterString;
+
+      this.address = address;
+
+      this.name = name;
+
+      this.exists = exists;
+   }
+
+   public boolean isExists()
+   {
+      return exists;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public int getConsumerCount()
+   {
+      return consumerCount;
+   }
+
+   public long getMessageCount()
+   {
+      return messageCount;
+   }
+
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public SimpleString getName()
+   {
+      return name;
+   }
+
+   public boolean isTemporary()
+   {
+      return temporary;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java
new file mode 100644
index 0000000..214dfdf
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.server.management;
+
+import org.apache.activemq6.api.core.management.NotificationType;
+import org.apache.activemq6.utils.TypedProperties;
+
+/**
+ * A Notification
+ * @see org.apache.activemq6.core.server.management.NotificationListener
+ * @see NotificationType
+ * @author <a href="mailto:[email protected]";>Tim Fox</a> Created 22 Jan 2009 
16:41:12
+ */
+public final class Notification
+{
+   private final NotificationType type;
+
+   private final TypedProperties properties;
+
+   private final String uid;
+
+   public Notification(final String uid, final NotificationType type, final 
TypedProperties properties)
+   {
+      this.uid = uid;
+      this.type = type;
+      this.properties = properties;
+   }
+
+   public NotificationType getType()
+   {
+      return type;
+   }
+
+   public TypedProperties getProperties()
+   {
+      return properties;
+   }
+
+   public String getUID()
+   {
+      return uid;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "Notification[uid=" + uid + ", type=" + type + ", properties=" + 
properties + "]";
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java
new file mode 100644
index 0000000..f20b1af
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.server.management;
+
+
+/**
+ * A NotificationListener
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ *
+ * Created 22 Jan 2009 16:48:27
+ *
+ *
+ */
+public interface NotificationListener
+{
+   void onNotification(Notification notification);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java
new file mode 100644
index 0000000..8cedaff
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.server.management;
+
+/**
+ * A NotificationService
+ *
+ * @author jmesnil
+ */
+public interface NotificationService
+{
+   /**
+    * the message corresponding to a notification will always contain the 
properties:
+    * <ul>
+    * <li><code>ManagementHelper.HDR_NOTIFICATION_TYPE</code> - the type of 
notification (SimpleString)</li>
+    * <li><code>ManagementHelper.HDR_NOTIFICATION_MESSAGE</code> - a message 
contextual to the notification (SimpleString)</li>
+    * <li><code>ManagementHelper.HDR_NOTIFICATION_TIMESTAMP</code> - the 
timestamp when the notification occurred (long)</li>
+    * </ul>
+    * in addition to the properties defined in <code>props</code>
+    *
+    * @see ManagementHelper
+    */
+   void sendNotification(Notification notification) throws Exception;
+
+   void enableNotifications(boolean enable);
+
+   void addNotificationListener(NotificationListener listener);
+
+   void removeNotificationListener(NotificationListener listener);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java
new file mode 100644
index 0000000..2904027
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.settings.impl;
+
+/**
+ * A AddressFullMessagePolicy
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public enum AddressFullMessagePolicy
+{
+   DROP, PAGE, BLOCK, FAIL;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java
new file mode 100644
index 0000000..e2963a7
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.transaction.impl;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq6.utils.Base64;
+
+/**
+ *
+ * Xid implementation
+ *
+ * @author <a href="mailto:[email protected]";>Adrian Brock</a>
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Juha Lindfors</a>
+ *
+ * @version $Revision 1.1 $
+ */
+public class XidImpl implements Xid, Serializable
+{
+   private static final long serialVersionUID = 407053232840068514L;
+
+   private final byte[] branchQualifier;
+
+   private final int formatId;
+
+   private final byte[] globalTransactionId;
+
+   private int hash;
+
+   private boolean hashCalculated;
+
+   // Static --------------------------------------------------------
+
+   public static String toBase64String(final Xid xid)
+   {
+      byte[] data = XidImpl.toByteArray(xid);
+      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES 
| Base64.URL_SAFE);
+   }
+
+   private static byte[] toByteArray(final Xid xid)
+   {
+      byte[] branchQualifier = xid.getBranchQualifier();
+      byte[] globalTransactionId = xid.getGlobalTransactionId();
+      int formatId = xid.getFormatId();
+
+      byte[] hashBytes = new byte[branchQualifier.length + 
globalTransactionId.length + 4];
+      System.arraycopy(branchQualifier, 0, hashBytes, 0, 
branchQualifier.length);
+      System.arraycopy(globalTransactionId, 0, hashBytes, 
branchQualifier.length, globalTransactionId.length);
+      byte[] intBytes = new byte[4];
+      for (int i = 0; i < 4; i++)
+      {
+         intBytes[i] = (byte)((formatId >> i * 8) % 0xFF);
+      }
+      System.arraycopy(intBytes, 0, hashBytes, branchQualifier.length + 
globalTransactionId.length, 4);
+      return hashBytes;
+   }
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * Standard constructor
+    * @param branchQualifier
+    * @param formatId
+    * @param globalTransactionId
+    */
+   public XidImpl(final byte[] branchQualifier, final int formatId, final 
byte[] globalTransactionId)
+   {
+      this.branchQualifier = branchQualifier;
+      this.formatId = formatId;
+      this.globalTransactionId = globalTransactionId;
+   }
+
+   /**
+    * Copy constructor
+    * @param other
+    */
+   public XidImpl(final Xid other)
+   {
+      branchQualifier = copyBytes(other.getBranchQualifier());
+      formatId = other.getFormatId();
+      globalTransactionId = copyBytes(other.getGlobalTransactionId());
+   }
+
+   // Xid implementation 
------------------------------------------------------------------
+
+   public byte[] getBranchQualifier()
+   {
+      return branchQualifier;
+   }
+
+   public int getFormatId()
+   {
+      return formatId;
+   }
+
+   public byte[] getGlobalTransactionId()
+   {
+      return globalTransactionId;
+   }
+
+   // Public 
-------------------------------------------------------------------------------
+
+   @Override
+   public int hashCode()
+   {
+      if (!hashCalculated)
+      {
+         calcHash();
+      }
+      return hash;
+   }
+
+   @Override
+   public boolean equals(final Object other)
+   {
+      if (this == other)
+      {
+         return true;
+      }
+      if (!(other instanceof Xid))
+      {
+         return false;
+      }
+      Xid xother = (Xid)other;
+      if (xother.getFormatId() != formatId)
+      {
+         return false;
+      }
+      if (xother.getBranchQualifier().length != branchQualifier.length)
+      {
+         return false;
+      }
+      if (xother.getGlobalTransactionId().length != globalTransactionId.length)
+      {
+         return false;
+      }
+      for (int i = 0; i < branchQualifier.length; i++)
+      {
+         byte[] otherBQ = xother.getBranchQualifier();
+         if (branchQualifier[i] != otherBQ[i])
+         {
+            return false;
+         }
+      }
+      for (int i = 0; i < globalTransactionId.length; i++)
+      {
+         byte[] otherGtx = xother.getGlobalTransactionId();
+         if (globalTransactionId[i] != otherGtx[i])
+         {
+            return false;
+         }
+      }
+      return true;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "XidImpl (" + System.identityHashCode(this) +
+             " bq:" +
+             stringRep(branchQualifier) +
+             " formatID:" +
+             formatId +
+             " gtxid:" +
+             stringRep(globalTransactionId) +
+             " base64:" + toBase64String(this);
+   }
+
+   // Private 
-------------------------------------------------------------------------------
+
+   private String stringRep(final byte[] bytes)
+   {
+      StringBuffer buff = new StringBuffer();
+      for (int i = 0; i < bytes.length; i++)
+      {
+         byte b = bytes[i];
+
+         buff.append(b);
+
+         if (i != bytes.length - 1)
+         {
+            buff.append('.');
+         }
+      }
+
+      return buff.toString();
+   }
+
+   private void calcHash()
+   {
+      byte[] hashBytes = XidImpl.toByteArray(this);
+      hash = Arrays.hashCode(hashBytes);
+      hashCalculated = true;
+   }
+
+   private byte[] copyBytes(final byte[] other)
+   {
+      byte[] bytes = new byte[other.length];
+
+      System.arraycopy(other, 0, bytes, 0, other.length);
+
+      return bytes;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java
new file mode 100644
index 0000000..783a7a6
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.version;
+
+/**
+ *
+ * A VersionImpl
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ *
+ */
+public interface Version
+{
+   String getFullVersion();
+
+   String getVersionName();
+
+   int getMajorVersion();
+
+   int getMinorVersion();
+
+   int getMicroVersion();
+
+   String getVersionSuffix();
+
+   int getIncrementingVersion();
+
+   boolean isCompatible(int v);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java
new file mode 100644
index 0000000..7295dfb
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.version.impl;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.apache.activemq6.core.version.Version;
+
+/**
+ * A VersionImpl
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ */
+public class VersionImpl implements Version, Serializable
+{
+   private static final long serialVersionUID = -5271227256591080403L;
+
+   private final String versionName;
+
+   private final int majorVersion;
+
+   private final int minorVersion;
+
+   private final int microVersion;
+
+   private final int incrementingVersion;
+
+   private final String versionSuffix;
+
+   private final int[] compatibleVersionList;
+
+   // Constructors --------------------------------------------------
+
+   public VersionImpl(final String versionName,
+                      final int majorVersion,
+                      final int minorVersion,
+                      final int microVersion,
+                      final int incrementingVersion,
+                      final String versionSuffix,
+                      final int[] compatibleVersionList)
+   {
+      this.versionName = versionName;
+
+      this.majorVersion = majorVersion;
+
+      this.minorVersion = minorVersion;
+
+      this.microVersion = microVersion;
+
+      this.incrementingVersion = incrementingVersion;
+
+      this.versionSuffix = versionSuffix;
+
+      this.compatibleVersionList = Arrays.copyOf(compatibleVersionList, 
compatibleVersionList.length);
+   }
+
+   // Version implementation ------------------------------------------
+
+   public String getFullVersion()
+   {
+      return majorVersion + "." +
+             minorVersion +
+             "." +
+             microVersion +
+             "." +
+             versionSuffix +
+             " (" +
+             versionName +
+             ", " +
+             incrementingVersion +
+             ")";
+   }
+
+   public String getVersionName()
+   {
+      return versionName;
+   }
+
+   public int getMajorVersion()
+   {
+      return majorVersion;
+   }
+
+   public int getMinorVersion()
+   {
+      return minorVersion;
+   }
+
+   public int getMicroVersion()
+   {
+      return microVersion;
+   }
+
+   public String getVersionSuffix()
+   {
+      return versionSuffix;
+   }
+
+   public int getIncrementingVersion()
+   {
+      return incrementingVersion;
+   }
+
+   public boolean isCompatible(int version)
+   {
+      for (int element : compatibleVersionList)
+      {
+         if (element == version)
+         {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + Arrays.hashCode(compatibleVersionList);
+      result = prime * result + incrementingVersion;
+      result = prime * result + majorVersion;
+      result = prime * result + microVersion;
+      result = prime * result + minorVersion;
+      result = prime * result + ((versionName == null) ? 0 : 
versionName.hashCode());
+      result = prime * result + ((versionSuffix == null) ? 0 : 
versionSuffix.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+      {
+         return true;
+      }
+      if (obj == null)
+      {
+         return false;
+      }
+      if (!(obj instanceof VersionImpl))
+      {
+         return false;
+      }
+      VersionImpl other = (VersionImpl)obj;
+      if (!Arrays.equals(compatibleVersionList, other.compatibleVersionList))
+      {
+         return false;
+      }
+      if (incrementingVersion != other.incrementingVersion)
+      {
+         return false;
+      }
+      if (majorVersion != other.majorVersion)
+      {
+         return false;
+      }
+      if (microVersion != other.microVersion)
+      {
+         return false;
+      }
+      if (minorVersion != other.minorVersion)
+      {
+         return false;
+      }
+      if (versionName == null)
+      {
+         if (other.versionName != null)
+         {
+            return false;
+         }
+      }
+      else if (!versionName.equals(other.versionName))
+      {
+         return false;
+      }
+      if (versionSuffix == null)
+      {
+         if (other.versionSuffix != null)
+         {
+            return false;
+         }
+      }
+      else if (!versionSuffix.equals(other.versionSuffix))
+      {
+         return false;
+      }
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java
new file mode 100644
index 0000000..21bc209
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.reader;
+
+import org.apache.activemq6.api.core.Message;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class BytesMessageUtil extends MessageUtil
+{
+
+   public static boolean bytesReadBoolean(Message message)
+   {
+      return getBodyBuffer(message).readBoolean();
+   }
+
+   public static byte bytesReadByte(Message message)
+   {
+      return getBodyBuffer(message).readByte();
+   }
+
+   public static int bytesReadUnsignedByte(Message message)
+   {
+      return getBodyBuffer(message).readUnsignedByte();
+   }
+
+   public static short bytesReadShort(Message message)
+   {
+      return getBodyBuffer(message).readShort();
+   }
+
+   public static int bytesReadUnsignedShort(Message message)
+   {
+      return getBodyBuffer(message).readUnsignedShort();
+   }
+
+   public static char bytesReadChar(Message message)
+   {
+      return (char)getBodyBuffer(message).readShort();
+   }
+
+   public static int bytesReadInt(Message message)
+   {
+      return getBodyBuffer(message).readInt();
+   }
+
+   public static long bytesReadLong(Message message)
+   {
+      return getBodyBuffer(message).readLong();
+   }
+
+   public static float bytesReadFloat(Message message)
+   {
+      return Float.intBitsToFloat(getBodyBuffer(message).readInt());
+   }
+
+   public static double bytesReadDouble(Message message)
+   {
+      return Double.longBitsToDouble(getBodyBuffer(message).readLong());
+   }
+
+   public static String bytesReadUTF(Message message)
+   {
+      return getBodyBuffer(message).readUTF();
+   }
+
+
+
+   public static int bytesReadBytes(Message message, final byte[] value)
+   {
+      return bytesReadBytes(message, value, value.length);
+   }
+
+   public static int bytesReadBytes(Message message, final byte[] value, final 
int length)
+   {
+      if (!getBodyBuffer(message).readable())
+      {
+         return -1;
+      }
+
+      int read = Math.min(length, getBodyBuffer(message).readableBytes());
+
+      if (read != 0)
+      {
+         getBodyBuffer(message).readBytes(value, 0, read);
+      }
+
+      return read;
+
+   }
+
+
+   public static void bytesWriteBoolean(Message message, boolean value)
+   {
+      getBodyBuffer(message).writeBoolean(value);
+   }
+
+
+
+   public static void bytesWriteByte(Message message, byte value)
+   {
+      getBodyBuffer(message).writeByte(value);
+   }
+
+
+
+   public static void bytesWriteShort(Message message, short value)
+   {
+      getBodyBuffer(message).writeShort(value);
+   }
+
+
+   public static void bytesWriteChar(Message message, char value)
+   {
+      getBodyBuffer(message).writeShort((short)value);
+   }
+
+   public static void bytesWriteInt(Message message, int value)
+   {
+      getBodyBuffer(message).writeInt(value);
+   }
+
+   public static void bytesWriteLong(Message message, long value)
+   {
+      getBodyBuffer(message).writeLong(value);
+   }
+
+   public static void bytesWriteFloat(Message message, float value)
+   {
+      getBodyBuffer(message).writeInt(Float.floatToIntBits(value));
+   }
+
+   public static void bytesWriteDouble(Message message, double value)
+   {
+      getBodyBuffer(message).writeLong(Double.doubleToLongBits(value));
+   }
+
+   public static void bytesWriteUTF(Message message, String value)
+   {
+      getBodyBuffer(message).writeUTF(value);
+   }
+
+   public static void bytesWriteBytes(Message message, byte[] value)
+   {
+      getBodyBuffer(message).writeBytes(value);
+   }
+
+   public static void bytesWriteBytes(Message message, final byte[] value, 
final int offset, final int length)
+   {
+      getBodyBuffer(message).writeBytes(value, offset, length);
+   }
+
+
+   /**
+    * Returns true if it could send the Object to any known format
+    * @param message
+    * @param value
+    * @return
+    */
+   public static boolean bytesWriteObject(Message message, Object value)
+   {
+      if (value == null)
+      {
+         throw new NullPointerException("Attempt to write a null value");
+      }
+      if (value instanceof String)
+      {
+         bytesWriteUTF(message, (String) value);
+      }
+      else if (value instanceof Boolean)
+      {
+         bytesWriteBoolean(message, (Boolean) value);
+      }
+      else if (value instanceof Character)
+      {
+         bytesWriteChar(message, (Character) value);
+      }
+      else if (value instanceof Byte)
+      {
+         bytesWriteByte(message, (Byte) value);
+      }
+      else if (value instanceof Short)
+      {
+         bytesWriteShort(message, (Short) value);
+      }
+      else if (value instanceof Integer)
+      {
+         bytesWriteInt(message, (Integer) value);
+      }
+      else if (value instanceof Long)
+      {
+         bytesWriteLong(message, (Long) value);
+      }
+      else if (value instanceof Float)
+      {
+         bytesWriteFloat(message, (Float) value);
+      }
+      else if (value instanceof Double)
+      {
+         bytesWriteDouble(message, (Double) value);
+      }
+      else if (value instanceof byte[])
+      {
+         bytesWriteBytes(message, (byte[]) value);
+      }
+      else
+      {
+         return false;
+      }
+
+
+      return true;
+   }
+
+   public static void bytesMessageReset(Message message)
+   {
+      getBodyBuffer(message).resetReaderIndex();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java
new file mode 100644
index 0000000..734b413
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.reader;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.utils.TypedProperties;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class MapMessageUtil extends MessageUtil
+{
+
+   /**
+    * Utility method to set the map on a message body
+    */
+   public static void writeBodyMap(Message message, TypedProperties properties)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.resetWriterIndex();
+      properties.encode(buff);
+   }
+
+   /**
+    * Utility method to set the map on a message body
+    */
+   public static TypedProperties readBodyMap(Message message)
+   {
+      TypedProperties map = new TypedProperties();
+      readBodyMap(message, map);
+      return map;
+   }
+
+   /**
+    * Utility method to set the map on a message body
+    */
+   public static void readBodyMap(Message message, TypedProperties map)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.resetReaderIndex();
+      map.decode(buff);
+   }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java
new file mode 100644
index 0000000..1b0bba1
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.reader;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQPropertyConversionException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+
+/**
+ * static methods intended for import static on JMS like messages.
+ *
+ * This provides a helper for core message to act some of the JMS functions 
used by the JMS wrapper
+ *
+ * @author Clebert Suconic
+ */
+
+public class MessageUtil
+{
+   public static final SimpleString CORRELATIONID_HEADER_NAME = new 
SimpleString("JMSCorrelationID");
+
+   public static final SimpleString REPLYTO_HEADER_NAME = new 
SimpleString("JMSReplyTo");
+
+   public static final SimpleString TYPE_HEADER_NAME = new 
SimpleString("JMSType");
+
+   public static final SimpleString JMS = new SimpleString("JMS");
+
+   public static final SimpleString JMSX = new SimpleString("JMSX");
+
+   public static final SimpleString JMS_ = new SimpleString("JMS_");
+
+   public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount";
+
+   public static final String JMSXGROUPID = "JMSXGroupID";
+
+   public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new 
SimpleString("__HQ_CID");
+
+
+
+   public static HornetQBuffer getBodyBuffer(Message message)
+   {
+      return message.getBodyBuffer();
+   }
+
+
+
+   public static byte[] getJMSCorrelationIDAsBytes(Message message)
+   {
+      Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME);
+
+      if (obj instanceof byte[])
+      {
+         return (byte[])obj;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+
+
+   public static void setJMSType(Message message, String type)
+   {
+      message.putStringProperty(TYPE_HEADER_NAME, new SimpleString(type));
+   }
+
+   public static String getJMSType(Message message)
+   {
+      SimpleString ss = message.getSimpleStringProperty(TYPE_HEADER_NAME);
+
+      if (ss != null)
+      {
+         return ss.toString();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+
+   public static final void setJMSCorrelationIDAsBytes(Message message, final 
byte[] correlationID) throws HornetQException
+   {
+      if (correlationID == null || correlationID.length == 0)
+      {
+         throw new HornetQException("Please specify a non-zero length byte[]");
+      }
+      message.putBytesProperty(CORRELATIONID_HEADER_NAME, correlationID);
+   }
+
+   public static void setJMSCorrelationID(Message message, final String 
correlationID)
+   {
+      if (correlationID == null)
+      {
+         message.removeProperty(CORRELATIONID_HEADER_NAME);
+      }
+      else
+      {
+         message.putStringProperty(CORRELATIONID_HEADER_NAME, new 
SimpleString(correlationID));
+      }
+   }
+
+   public static String getJMSCorrelationID(Message message)
+   {
+      try
+      {
+         return message.getStringProperty(CORRELATIONID_HEADER_NAME);
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         return null;
+      }
+   }
+
+
+   public static SimpleString getJMSReplyTo(Message message)
+   {
+      return message.getSimpleStringProperty(REPLYTO_HEADER_NAME);
+   }
+
+   public static void setJMSReplyTo(Message message, final SimpleString dest)
+   {
+
+      if (dest == null)
+      {
+         message.removeProperty(REPLYTO_HEADER_NAME);
+      }
+      else
+      {
+
+         message.putStringProperty(REPLYTO_HEADER_NAME, dest);
+      }
+   }
+
+
+
+   public static void clearProperties(Message message)
+   {
+
+      List<SimpleString> toRemove = new ArrayList<SimpleString>();
+
+      for (SimpleString propName : message.getPropertyNames())
+      {
+         if (!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
+            propName.startsWith(JMS_))
+         {
+            toRemove.add(propName);
+         }
+      }
+
+      for (SimpleString propName : toRemove)
+      {
+         message.removeProperty(propName);
+      }
+   }
+
+
+
+   public static Set<String> getPropertyNames(Message message)
+   {
+      HashSet<String> set = new HashSet<String>();
+
+      for (SimpleString propName : message.getPropertyNames())
+      {
+         if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
+            propName.startsWith(JMS_)) && 
!propName.startsWith(CONNECTION_ID_PROPERTY_NAME))
+         {
+            set.add(propName.toString());
+         }
+      }
+
+      set.add(JMSXDELIVERYCOUNT);
+
+      return set;
+   }
+
+   public static boolean propertyExists(Message message, String name)
+   {
+      return message.containsProperty(new SimpleString(name)) || 
name.equals(MessageUtil.JMSXDELIVERYCOUNT) ||
+         MessageUtil.JMSXGROUPID.equals(name) &&
+            
message.containsProperty(org.apache.activemq6.api.core.Message.HDR_GROUP_ID);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java
new file mode 100644
index 0000000..b92132e
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.reader;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.utils.DataConstants;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class StreamMessageUtil extends MessageUtil
+{
+   /**
+    * Method to read boolean values out of the Stream protocol existent on JMS 
Stream Messages
+    * Throws IllegalStateException if the type was invalid
+    *
+    * @param message
+    * @return
+    */
+   public static boolean streamReadBoolean(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+
+      switch (type)
+      {
+         case DataConstants.BOOLEAN:
+            return buff.readBoolean();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Boolean.valueOf(s);
+         default:
+            throw new IllegalStateException("Invalid conversion, type byte was 
" + type);
+      }
+
+   }
+
+   public static byte streamReadByte(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      int index = buff.readerIndex();
+      try
+      {
+         byte type = buff.readByte();
+         switch (type)
+         {
+            case DataConstants.BYTE:
+               return buff.readByte();
+            case DataConstants.STRING:
+               String s = buff.readNullableString();
+               return Byte.parseByte(s);
+            default:
+               throw new IllegalStateException("Invalid conversion");
+         }
+      }
+      catch (NumberFormatException e)
+      {
+         buff.readerIndex(index);
+         throw e;
+      }
+
+   }
+
+   public static short streamReadShort(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Short.parseShort(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+   public static char streamReadChar(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.CHAR:
+            return (char)buff.readShort();
+         case DataConstants.STRING:
+            String str = buff.readNullableString();
+            if (str == null)
+            {
+               throw new NullPointerException("Invalid conversion");
+            }
+            else
+            {
+               throw new IllegalStateException("Invalid conversion");
+            }
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+
+   }
+
+   public static int streamReadInteger(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.INT:
+            return buff.readInt();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Integer.parseInt(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+
+   public static long streamReadLong(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.INT:
+            return buff.readInt();
+         case DataConstants.LONG:
+            return buff.readLong();
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Long.parseLong(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+   public static float streamReadFloat(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.FLOAT:
+            return Float.intBitsToFloat(buff.readInt());
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Float.parseFloat(s);
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+
+   public static double streamReadDouble(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.FLOAT:
+            return Float.intBitsToFloat(buff.readInt());
+         case DataConstants.DOUBLE:
+            return Double.longBitsToDouble(buff.readLong());
+         case DataConstants.STRING:
+            String s = buff.readNullableString();
+            return Double.parseDouble(s);
+         default:
+            throw new IllegalStateException("Invalid conversion: " + type);
+      }
+   }
+
+
+   public static String streamReadString(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BOOLEAN:
+            return String.valueOf(buff.readBoolean());
+         case DataConstants.BYTE:
+            return String.valueOf(buff.readByte());
+         case DataConstants.SHORT:
+            return String.valueOf(buff.readShort());
+         case DataConstants.CHAR:
+            return String.valueOf((char)buff.readShort());
+         case DataConstants.INT:
+            return String.valueOf(buff.readInt());
+         case DataConstants.LONG:
+            return String.valueOf(buff.readLong());
+         case DataConstants.FLOAT:
+            return String.valueOf(Float.intBitsToFloat(buff.readInt()));
+         case DataConstants.DOUBLE:
+            return String.valueOf(Double.longBitsToDouble(buff.readLong()));
+         case DataConstants.STRING:
+            return buff.readNullableString();
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+   }
+
+   /**
+    * Utility for reading bytes out of streaming.
+    * It will return remainingBytes, bytesRead
+    * @param remainingBytes remaining Bytes from previous read. Send it to 0 
if it was the first call for the message
+    * @param message
+    * @return a pair of remaining bytes and bytes read
+    */
+   public static Pair<Integer, Integer> streamReadBytes(Message message, int 
remainingBytes, byte[] value)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+
+      if (remainingBytes == -1)
+      {
+         return new Pair<>(0, -1);
+      }
+      else if (remainingBytes == 0)
+      {
+         byte type = buff.readByte();
+         if (type != DataConstants.BYTES)
+         {
+            throw new IllegalStateException("Invalid conversion");
+         }
+         remainingBytes = buff.readInt();
+      }
+      int read = Math.min(value.length, remainingBytes);
+      buff.readBytes(value, 0, read);
+      remainingBytes -= read;
+      if (remainingBytes == 0)
+      {
+         remainingBytes = -1;
+      }
+      return new Pair<>(remainingBytes, read);
+
+   }
+
+   public static Object streamReadObject(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+
+      byte type = buff.readByte();
+      switch (type)
+      {
+         case DataConstants.BOOLEAN:
+            return buff.readBoolean();
+         case DataConstants.BYTE:
+            return buff.readByte();
+         case DataConstants.SHORT:
+            return buff.readShort();
+         case DataConstants.CHAR:
+            return (char)buff.readShort();
+         case DataConstants.INT:
+            return buff.readInt();
+         case DataConstants.LONG:
+            return buff.readLong();
+         case DataConstants.FLOAT:
+            return Float.intBitsToFloat(buff.readInt());
+         case DataConstants.DOUBLE:
+            return Double.longBitsToDouble(buff.readLong());
+         case DataConstants.STRING:
+            return buff.readNullableString();
+         case DataConstants.BYTES:
+            int bufferLen = buff.readInt();
+            byte[] bytes = new byte[bufferLen];
+            buff.readBytes(bytes);
+            return bytes;
+         default:
+            throw new IllegalStateException("Invalid conversion");
+      }
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java
new file mode 100644
index 0000000..8821e4e
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.reader;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class TextMessageUtil extends MessageUtil
+{
+
+   /**
+    * Utility method to set the Text message on a message body
+    */
+   public static void writeBodyText(Message message, SimpleString text)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.clear();
+      buff.writeNullableSimpleString(text);
+   }
+
+   /**
+    * Utility method to set the Text message on a message body
+    */
+   public static SimpleString readBodyText(Message message)
+   {
+      HornetQBuffer buff = getBodyBuffer(message);
+      buff.resetReaderIndex();
+      return buff.readNullableSimpleString();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java
new file mode 100644
index 0000000..55cf5b5
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/**
+ * Provides reading methods for JMS like objects.
+ * This isolates the logic from the JMS client case you need this kind of 
functionality from core.
+ * This is also used on conversions between protocols such as AMQP and Core 
where
+ * this is done at the server's
+ * @author Clebert Suconic
+ */
+package org.apache.activemq6.reader;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java
new file mode 100644
index 0000000..e826e0b
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.spi.core.protocol;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.remoting.CloseListener;
+import org.apache.activemq6.core.remoting.FailureListener;
+import org.apache.activemq6.spi.core.remoting.Connection;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public abstract class AbstractRemotingConnection implements RemotingConnection
+{
+   protected final List<FailureListener> failureListeners = new 
CopyOnWriteArrayList<FailureListener>();
+   protected final List<CloseListener> closeListeners = new 
CopyOnWriteArrayList<CloseListener>();
+   protected final Connection transportConnection;
+   protected final Executor executor;
+   protected final long creationTime;
+   protected volatile boolean dataReceived;
+
+   public AbstractRemotingConnection(final Connection transportConnection, 
final Executor executor)
+   {
+      this.transportConnection = transportConnection;
+      this.executor = executor;
+      this.creationTime = System.currentTimeMillis();
+   }
+
+   public List<FailureListener> getFailureListeners()
+   {
+      return new ArrayList<FailureListener>(failureListeners);
+   }
+
+   protected void callFailureListeners(final HornetQException me, String 
scaleDownTargetNodeID)
+   {
+      final List<FailureListener> listenersClone = new 
ArrayList<FailureListener>(failureListeners);
+
+      for (final FailureListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionFailed(me, false, scaleDownTargetNodeID);
+         }
+         catch (HornetQInterruptedException interrupted)
+         {
+            // this is an expected behaviour.. no warn or error here
+            HornetQClientLogger.LOGGER.debug("thread interrupted", 
interrupted);
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQClientLogger.LOGGER.errorCallingFailureListener(t);
+         }
+      }
+   }
+
+
+   protected void callClosingListeners()
+   {
+      final List<CloseListener> listenersClone = new 
ArrayList<CloseListener>(closeListeners);
+
+      for (final CloseListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionClosed();
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            HornetQClientLogger.LOGGER.errorCallingFailureListener(t);
+         }
+      }
+   }
+
+   public void setFailureListeners(final List<FailureListener> listeners)
+   {
+      failureListeners.clear();
+
+      failureListeners.addAll(listeners);
+   }
+
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   public String getRemoteAddress()
+   {
+      return transportConnection.getRemoteAddress();
+   }
+
+   public void addFailureListener(final FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull();
+      }
+      failureListeners.add(listener);
+   }
+
+   public boolean removeFailureListener(final FailureListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull();
+      }
+
+      return failureListeners.remove(listener);
+   }
+
+   public void addCloseListener(final CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull();
+      }
+
+      closeListeners.add(listener);
+   }
+
+   public boolean removeCloseListener(final CloseListener listener)
+   {
+      if (listener == null)
+      {
+         throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull();
+      }
+
+      return closeListeners.remove(listener);
+   }
+
+   public List<CloseListener> removeCloseListeners()
+   {
+      List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
+
+      closeListeners.clear();
+
+      return ret;
+   }
+
+   public List<FailureListener> removeFailureListeners()
+   {
+      List<FailureListener> ret = getFailureListeners();
+
+      failureListeners.clear();
+
+      return ret;
+   }
+
+   public void setCloseListeners(List<CloseListener> listeners)
+   {
+      closeListeners.clear();
+
+      closeListeners.addAll(listeners);
+   }
+
+   public HornetQBuffer createBuffer(final int size)
+   {
+      return transportConnection.createBuffer(size);
+   }
+
+   public Connection getTransportConnection()
+   {
+      return transportConnection;
+   }
+
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+
+   public boolean checkDataReceived()
+   {
+      boolean res = dataReceived;
+
+      dataReceived = false;
+
+      return res;
+   }
+
+   /*
+    * This can be called concurrently by more than one thread so needs to be 
locked
+    */
+   public void fail(final HornetQException me)
+   {
+      fail(me, null);
+   }
+
+   public void bufferReceived(final Object connectionID, final HornetQBuffer 
buffer)
+   {
+      dataReceived = true;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java
new file mode 100644
index 0000000..2ebc470
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.protocol;
+
+import java.util.concurrent.Executor;
+
+
+/**
+ * A ConnectionEntry
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConnectionEntry
+{
+   public final RemotingConnection connection;
+
+   public volatile long lastCheck;
+
+   public volatile long ttl;
+
+   public final Executor connectionExecutor;
+
+   public Object getID()
+   {
+      return connection.getID();
+   }
+
+   public ConnectionEntry(final RemotingConnection connection, final Executor 
connectionExecutor, final long lastCheck, final long ttl)
+   {
+      this.connection = connection;
+
+      this.lastCheck = lastCheck;
+
+      this.ttl = ttl;
+
+      this.connectionExecutor = connectionExecutor;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java
new file mode 100644
index 0000000..51a048d
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.protocol;
+
+import java.util.List;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.remoting.CloseListener;
+import org.apache.activemq6.core.remoting.FailureListener;
+import org.apache.activemq6.spi.core.remoting.BufferHandler;
+import org.apache.activemq6.spi.core.remoting.Connection;
+
+/**
+ * A RemotingConnection is a connection between a client and a server.
+ *
+ *
+ * Perhaps a better name for this class now would be ProtocolConnection as this
+ * represents the link with the used protocol
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author <a href="mailto:[email protected]";>Jeff Mesnil</a>
+ */
+public interface RemotingConnection extends BufferHandler
+{
+   /**
+    * Returns the unique id of the {@link RemotingConnection}.
+    * @return the id
+    */
+   Object getID();
+
+   /**
+    * Returns the creation time of the {@link RemotingConnection}.
+    */
+   long getCreationTime();
+
+   /**
+    * returns a string representation of the remote address of this connection
+    *
+    * @return the remote address
+    */
+   String getRemoteAddress();
+
+   /**
+    * add a failure listener.
+    * <p>
+    * The listener will be called in the event of connection failure.
+    *
+    * @param listener the listener
+    */
+   void addFailureListener(FailureListener listener);
+
+   /**
+    * remove the failure listener
+    *
+    * @param listener the lister to remove
+    * @return true if removed
+    */
+   boolean removeFailureListener(FailureListener listener);
+
+   /**
+    * add a CloseListener.
+    * <p>
+    * This will be called in the event of the connection being closed.
+    *
+    * @param listener the listener to add
+    */
+   void addCloseListener(CloseListener listener);
+
+   /**
+    * remove a Close Listener
+    *
+    * @param listener the listener to remove
+    * @return true if removed
+    */
+   boolean removeCloseListener(CloseListener listener);
+
+   List<CloseListener> removeCloseListeners();
+
+   void setCloseListeners(List<CloseListener> listeners);
+
+
+   /**
+    * return all the failure listeners
+    *
+    * @return the listeners
+    */
+   List<FailureListener> getFailureListeners();
+
+   List<FailureListener> removeFailureListeners();
+
+
+   /**
+    * set the failure listeners.
+    * <p>
+    * These will be called in the event of the connection being closed. Any 
previosuly added listeners will be removed.
+    *
+    * @param listeners the listeners to add.
+    */
+   void setFailureListeners(List<FailureListener> listeners);
+
+   /**
+    * creates a new HornetQBuffer of the specified size.
+    *
+    * @param size the size of buffer required
+    * @return the buffer
+    */
+   HornetQBuffer createBuffer(int size);
+
+   /**
+    * called when the underlying connection fails.
+    *
+    * @param me the exception that caused the failure
+    */
+   void fail(HornetQException me);
+
+   /**
+    * called when the underlying connection fails.
+    *
+    * @param me the exception that caused the failure
+    * @param scaleDownTargetNodeID the ID of the node where scale down is 
targeted
+    */
+   void fail(HornetQException me, String scaleDownTargetNodeID);
+
+   /**
+    * destroys this connection.
+    */
+   void destroy();
+
+   /**
+    * return the underlying Connection.
+    *
+    * @return the connection
+    */
+   Connection getTransportConnection();
+
+   /**
+    * Returns whether or not the {@link RemotingConnection} is a client
+    * @return true if client, false if a server
+    */
+   boolean isClient();
+
+   /**
+    * Returns true if this {@link RemotingConnection} has been destroyed.
+    * @return true if destroyed, otherwise false
+    */
+   boolean isDestroyed();
+
+   /**
+    * Disconnect the connection, closing all channels
+    */
+   void disconnect(boolean criticalError);
+
+   /**
+    * Disconnect the connection, closing all channels
+    */
+   void disconnect(String scaleDownNodeID, boolean criticalError);
+
+   /**
+    * returns true if any data has been received since the last time this 
method was called.
+    *
+    * @return true if data has been received.
+    */
+   boolean checkDataReceived();
+
+   /**
+    * flush all outstanding data from the connection.
+    */
+   void flush();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java
new file mode 100644
index 0000000..98c477c
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import java.util.Map;
+
+/**
+ * Abstract connector
+ *
+ * @author <a href="mailto:[email protected]";>Howard Gao</a>
+ */
+public abstract class AbstractConnector implements Connector
+{
+   protected final Map<String, Object> configuration;
+
+   protected AbstractConnector(Map<String, Object> configuration)
+   {
+      this.configuration = configuration;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java
new file mode 100644
index 0000000..23e853c
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+
+/**
+ * A BufferDecoder
+ *
+ * @author tim
+ *
+ *
+ */
+public interface BufferDecoder
+{
+   /**
+    * called by the remoting system prior to {@link 
org.apache.activemq6.spi.core.remoting.BufferHandler#bufferReceived(Object, 
org.hornetq.api.core.HornetQBuffer)}.
+    * <p>
+    * The implementation should return true if there is enough data in the 
buffer to decode. otherwise false.
+    *                  * @param buffer the buffer
+    * @return true id the buffer can be decoded..
+    */
+   int isReadyToHandle(HornetQBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java
new file mode 100644
index 0000000..3f3ae91
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+
+/**
+ * A BufferHandler that will handle buffers received by an acceptor.
+ * <p>
+ * The Buffer Handler will decode the buffer and take the appropriate action, 
typically forwarding to the correct channel.
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ */
+public interface BufferHandler
+{
+   /**
+    * called by the remoting connection when a buffer is received.
+    *
+    * @param connectionID the connection the buffer was received on
+    * @param buffer       the buffer to decode
+    */
+   void bufferReceived(Object connectionID, HornetQBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java
new file mode 100644
index 0000000..f11faf1
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.spi.core.remoting;
+
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Interceptor;
+import org.apache.activemq6.api.core.client.ClientSessionFactory;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface ClientProtocolManager
+{
+
+   /// Life Cycle Methods:
+
+   RemotingConnection connect(Connection transportConnection, long 
callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, 
List<Interceptor> outgoingInterceptors, TopologyResponseHandler 
topologyResponseHandler);
+
+   RemotingConnection getCurrentConnection();
+
+   Lock lockSessionCreation();
+
+   boolean waitOnLatch(long milliseconds) throws InterruptedException;
+
+   /**
+    * This is to be called when a connection failed and we want to interrupt 
any communication.
+    * This used to be called exitLoop at some point o the code.. with a method 
named causeExit from ClientSessionFactoryImpl
+    */
+   void stop();
+
+   boolean isAlive();
+
+   /// Sending methods
+
+   void addChannelHandlers(ChannelPipeline pipeline);
+
+   void sendSubscribeTopology(boolean isServer);
+
+   void ping(long connectionTTL);
+
+   SessionContext createSessionContext(final String name,
+                                       final String username,
+                                       final String password,
+                                       final boolean xa,
+                                       final boolean autoCommitSends,
+                                       final boolean autoCommitAcks,
+                                       final boolean preAcknowledge,
+                                       int minLargeMessageSize,
+                                       int confirmationWindowSize) throws 
HornetQException;
+
+   boolean cleanupBeforeFailover(HornetQException cause);
+
+   boolean checkForFailover(String liveNodeID) throws HornetQException;
+
+   void setSessionFactory(ClientSessionFactory factory);
+
+   ClientSessionFactory getSessionFactory();
+
+   String getName();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java
new file mode 100644
index 0000000..49e81f9
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import java.io.Serializable;
+
+/**
+ * @author Clebert Suconic
+ */
+public interface ClientProtocolManagerFactory extends Serializable
+{
+
+   ClientProtocolManager newProtocolManager();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git 
a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java
 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java
new file mode 100644
index 0000000..5bd747b
--- /dev/null
+++ 
b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.spi.core.remoting;
+
+import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.core.security.HornetQPrincipal;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+
+/**
+ * The connection used by a channel to write data to.
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ * @author Clebert Suconic
+ */
+public interface Connection
+{
+   /**
+    * Create a new HornetQBuffer of the given size.
+    *
+    * @param size the size of buffer to create
+    * @return the new buffer.
+    */
+   HornetQBuffer createBuffer(int size);
+
+
+   RemotingConnection getProtocolConnection();
+
+   void setProtocolConnection(RemotingConnection connection);
+
+   /**
+    * returns the unique id of this wire.
+    *
+    * @return the id
+    */
+   Object getID();
+
+   /**
+    * writes the buffer to the connection and if flush is true returns only 
when the buffer has been physically written to the connection.
+    *
+    * @param buffer the buffer to write
+    * @param flush  whether to flush the buffers onto the wire
+    * @param batched whether the packet is allowed to batched for better 
performance
+    */
+   void write(HornetQBuffer buffer, boolean flush, boolean batched);
+
+   /**
+    * writes the buffer to the connection and if flush is true returns only 
when the buffer has been physically written to the connection.
+    *
+    * @param buffer the buffer to write
+    * @param flush  whether to flush the buffers onto the wire
+    * @param batched whether the packet is allowed to batched for better 
performance
+    */
+   void write(HornetQBuffer buffer, boolean flush, boolean batched, 
ChannelFutureListener futureListener);
+
+   /**
+    * writes the buffer to the connection with no flushing or batching
+    *
+    * @param buffer the buffer to write
+    */
+   void write(HornetQBuffer buffer);
+
+
+   /**
+    * This should close the internal channel without calling any listeners.
+    * This is to avoid a situation where the broker is busy writing on an 
internal thread.
+    * This should close the socket releasing any pending threads.
+    */
+   void forceClose();
+
+   /**
+    * Closes the connection.
+    */
+   void close();
+
+   /**
+    * Returns a string representation of the remote address this connection is 
connected to.
+    * @return the remote address
+    */
+   String getRemoteAddress();
+
+   /**
+    * Called periodically to flush any data in the batch buffer
+    */
+   void checkFlushBatchBuffer();
+
+   void addReadyListener(ReadyListener listener);
+
+   void removeReadyListener(ReadyListener listener);
+
+   /**
+    * Generates a {@link TransportConfiguration} to be used to connect to the 
same target this is
+    * connected to.
+    * @return TransportConfiguration
+    */
+   TransportConfiguration getConnectorConfig();
+
+   HornetQPrincipal getDefaultHornetQPrincipal();
+
+   /**
+    * the InVM Connection has some special handling as it doesn't use Netty 
ProtocolChannel
+    * we will use this method Instead of using instanceof
+    * @return
+    */
+   boolean isUsingProtocolHandling();
+}
\ No newline at end of file

Reply via email to