http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java
new file mode 100644
index 0000000..320651e
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManager.java
@@ -0,0 +1,756 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.InvalidClientIDException;
+
+import io.netty.channel.ChannelPipeline;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.state.SessionState;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.InetAddressUtil;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.protocol.openwire.amq.AMQConnectionContext;
+import org.hornetq.core.protocol.openwire.amq.AMQPersistenceAdapter;
+import org.hornetq.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.hornetq.core.protocol.openwire.amq.AMQServerSession;
+import org.hornetq.core.protocol.openwire.amq.AMQSession;
+import org.hornetq.core.protocol.openwire.amq.AMQTransportConnectionState;
+import org.hornetq.core.remoting.impl.netty.NettyServerConnection;
+import org.hornetq.core.security.CheckType;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.MessageConverter;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
+import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+
+public class OpenWireProtocolManager implements ProtocolManager
+{
+   private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
+   private static final IdGenerator ID_GENERATOR = new IdGenerator();
+
+   private final LongSequenceGenerator messageIdGenerator = new 
LongSequenceGenerator();
+   private final HornetQServer server;
+
+   private OpenWireFormatFactory wireFactory;
+
+   private boolean tightEncodingEnabled = true;
+
+   private boolean prefixPacketSize = true;
+
+   private BrokerState brokerState;
+
+   private BrokerId brokerId;
+   protected final ProducerId advisoryProducerId = new ProducerId();
+
+   // from broker
+   protected final Map<ConnectionId, ConnectionState> brokerConnectionStates = 
Collections
+         .synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
+
+   private final CopyOnWriteArrayList<OpenWireConnection> connections = new 
CopyOnWriteArrayList<OpenWireConnection>();
+
+   protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> 
connectionInfos = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
+
+   private final Map<String, AMQConnectionContext> clientIdSet = new 
HashMap<String, AMQConnectionContext>();
+
+   private String brokerName;
+
+   private Map<SessionId, AMQSession> sessions = new 
ConcurrentHashMap<SessionId, AMQSession>();
+
+   private Map<TransactionId, AMQSession> transactions = new 
ConcurrentHashMap<TransactionId, AMQSession>();
+
+   public OpenWireProtocolManager(HornetQServer server)
+   {
+      this.server = server;
+      this.wireFactory = new OpenWireFormatFactory();
+      // preferred prop, should be done via config
+      wireFactory.setCacheEnabled(false);
+      brokerState = new BrokerState();
+      advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
+   }
+
+   @Override
+   public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed,
+         Connection connection)
+   {
+      OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
+      OpenWireConnection owConn = new OpenWireConnection(acceptorUsed,
+            connection, this, wf);
+      owConn.init();
+
+      return new ConnectionEntry(owConn, null, System.currentTimeMillis(),
+            1 * 60 * 1000);
+   }
+
+   @Override
+   public MessageConverter getConverter()
+   {
+      return new OpenWireMessageConverter();
+   }
+
+   @Override
+   public void removeHandler(String name)
+   {
+      // TODO Auto-generated method stub
+   }
+
+   @Override
+   public void handleBuffer(RemotingConnection connection, HornetQBuffer 
buffer)
+   {
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public boolean isProtocol(byte[] array)
+   {
+      if (array.length < 8)
+      {
+         throw new IllegalArgumentException("Protocol header length changed "
+               + array.length);
+      }
+
+      int start = this.prefixPacketSize ? 4 : 0;
+      int j = 0;
+      // type
+      if (array[start] != WireFormatInfo.DATA_STRUCTURE_TYPE)
+      {
+         return false;
+      }
+      start++;
+      WireFormatInfo info = new WireFormatInfo();
+      final byte[] magic = info.getMagic();
+      int remainingLen = array.length - start;
+      int useLen = remainingLen > magic.length ? magic.length : remainingLen;
+      useLen += start;
+      // magic
+      for (int i = start; i < useLen; i++)
+      {
+         if (array[i] != magic[j])
+         {
+            return false;
+         }
+         j++;
+      }
+      return true;
+   }
+
+   @Override
+   public void handshake(NettyServerConnection connection, HornetQBuffer 
buffer)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   public void handleCommand(OpenWireConnection openWireConnection,
+         Object command)
+   {
+      Command amqCmd = (Command) command;
+      byte type = amqCmd.getDataStructureType();
+      switch (type)
+      {
+         case CommandTypes.CONNECTION_INFO:
+            break;
+         default:
+            throw new IllegalStateException("Cannot handle command: " + 
command);
+      }
+   }
+
+   public void sendReply(final OpenWireConnection connection,
+         final Command command)
+   {
+      server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
+      {
+         public void onError(final int errorCode, final String errorMessage)
+         {
+            HornetQServerLogger.LOGGER.errorProcessingIOCallback(errorCode,
+                  errorMessage);
+         }
+
+         public void done()
+         {
+            send(connection, command);
+         }
+      });
+   }
+
+   public boolean send(final OpenWireConnection connection, final Command 
command)
+   {
+      if (HornetQServerLogger.LOGGER.isTraceEnabled())
+      {
+         HornetQServerLogger.LOGGER.trace("sending " + command);
+      }
+      synchronized (connection)
+      {
+         if (connection.isDestroyed())
+         {
+            return false;
+         }
+
+         try
+         {
+            connection.physicalSend(command);
+         }
+         catch (Exception e)
+         {
+            return false;
+         }
+         catch (Throwable t)
+         {
+            return false;
+         }
+         return true;
+      }
+   }
+
+   public Map<ConnectionId, ConnectionState> getConnectionStates()
+   {
+      return this.brokerConnectionStates;
+   }
+
+   public void addConnection(AMQConnectionContext context, ConnectionInfo 
info) throws Exception
+   {
+      String username = info.getUserName();
+      String password = info.getPassword();
+
+      if (!this.validateUser(username, password))
+      {
+         throw new SecurityException("User name [" + username + "] or password 
is invalid.");
+      }
+      String clientId = info.getClientId();
+      if (clientId == null)
+      {
+         throw new InvalidClientIDException(
+               "No clientID specified for connection request");
+      }
+      synchronized (clientIdSet)
+      {
+         AMQConnectionContext oldContext = clientIdSet.get(clientId);
+         if (oldContext != null)
+         {
+            if (context.isAllowLinkStealing())
+            {
+               clientIdSet.remove(clientId);
+               if (oldContext.getConnection() != null)
+               {
+                  OpenWireConnection connection = oldContext.getConnection();
+                  connection.disconnect(true);
+               }
+               else
+               {
+                  // log error
+               }
+            }
+            else
+            {
+               throw new InvalidClientIDException("Broker: " + getBrokerName()
+                     + " - Client: " + clientId + " already connected from "
+                     + oldContext.getConnection().getRemoteAddress());
+            }
+         }
+         else
+         {
+            clientIdSet.put(clientId, context);
+         }
+      }
+
+      connections.add(context.getConnection());
+
+      ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
+      // do not distribute passwords in advisory messages. usernames okay
+      ConnectionInfo copy = info.copy();
+      copy.setPassword("");
+      fireAdvisory(context, topic, copy);
+      connectionInfos.put(copy.getConnectionId(), copy);
+
+      // init the conn
+      addSessions(context.getConnection(), context.getConnectionState()
+            .getSessionIds());
+   }
+
+   private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
+         Command copy) throws Exception
+   {
+      this.fireAdvisory(context, topic, copy, null);
+   }
+
+   public BrokerId getBrokerId()
+   {
+      if (brokerId == null)
+      {
+         brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
+      }
+      return brokerId;
+   }
+
+   /*
+    * See AdvisoryBroker.fireAdvisory()
+    */
+   private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic,
+         Command command, ConsumerId targetConsumerId) throws Exception
+   {
+      ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+      advisoryMessage.setStringProperty(
+            AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
+      String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
+      advisoryMessage.setStringProperty(
+            AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
+
+      String url = "tcp://localhost:61616";
+
+      advisoryMessage.setStringProperty(
+            AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
+
+      // set the data structure
+      advisoryMessage.setDataStructure(command);
+      advisoryMessage.setPersistent(false);
+      advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+      advisoryMessage.setMessageId(new MessageId(advisoryProducerId,
+            messageIdGenerator.getNextSequenceId()));
+      advisoryMessage.setTargetConsumerId(targetConsumerId);
+      advisoryMessage.setDestination(topic);
+      advisoryMessage.setResponseRequired(false);
+      advisoryMessage.setProducerId(advisoryProducerId);
+      boolean originalFlowControl = context.isProducerFlowControl();
+      final AMQProducerBrokerExchange producerExchange = new 
AMQProducerBrokerExchange();
+      producerExchange.setConnectionContext(context);
+      producerExchange.setMutable(true);
+      producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
+      try
+      {
+         context.setProducerFlowControl(false);
+         AMQSession sess = context.getConnection().getAdvisorySession();
+         if (sess != null)
+         {
+            sess.send(producerExchange, advisoryMessage, false);
+         }
+      }
+      finally
+      {
+         context.setProducerFlowControl(originalFlowControl);
+      }
+   }
+
+   public String getBrokerName()
+   {
+      if (brokerName == null)
+      {
+         try
+         {
+            brokerName = InetAddressUtil.getLocalHostName().toLowerCase(
+                  Locale.ENGLISH);
+         }
+         catch (Exception e)
+         {
+            brokerName = "localhost";
+         }
+      }
+      return brokerName;
+   }
+
+   public boolean isFaultTolerantConfiguration()
+   {
+      return false;
+   }
+
+   public void postProcessDispatch(MessageDispatch md)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   public boolean isStopped()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   public void preProcessDispatch(MessageDispatch messageDispatch)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   public boolean isStopping()
+   {
+      return false;
+   }
+
+   public void addProducer(OpenWireConnection theConn, ProducerInfo info)
+   {
+      SessionId sessionId = info.getProducerId().getParentId();
+      ConnectionId connectionId = sessionId.getParentId();
+      AMQTransportConnectionState cs = theConn
+            .lookupConnectionState(connectionId);
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot add a producer to a connection that had not been 
registered: "
+                     + connectionId);
+      }
+      SessionState ss = cs.getSessionState(sessionId);
+      if (ss == null)
+      {
+         throw new IllegalStateException(
+               "Cannot add a producer to a session that had not been 
registered: "
+                     + sessionId);
+      }
+      // Avoid replaying dup commands
+      if (!ss.getProducerIds().contains(info.getProducerId()))
+      {
+         ActiveMQDestination destination = info.getDestination();
+         if (destination != null
+               && !AdvisorySupport.isAdvisoryTopic(destination))
+         {
+            if (theConn.getProducerCount(connectionId) >= theConn
+                  .getMaximumProducersAllowedPerConnection())
+            {
+               throw new IllegalStateException(
+                     "Can't add producer on connection " + connectionId
+                           + ": at maximum limit: "
+                           + 
theConn.getMaximumProducersAllowedPerConnection());
+            }
+         }
+
+         AMQSession amqSession = sessions.get(sessionId);
+         if (amqSession == null)
+         {
+            throw new IllegalStateException("Session not exist! : " + 
sessionId);
+         }
+
+         amqSession.createProducer(info);
+
+         try
+         {
+            ss.addProducer(info);
+         }
+         catch (IllegalStateException e)
+         {
+            amqSession.removeProducer(info);
+         }
+
+      }
+
+   }
+
+   public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) 
throws Exception
+   {
+      // Todo: add a destination interceptors holder here (amq supports this)
+      SessionId sessionId = info.getConsumerId().getParentId();
+      ConnectionId connectionId = sessionId.getParentId();
+      AMQTransportConnectionState cs = theConn
+            .lookupConnectionState(connectionId);
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot add a consumer to a connection that had not been 
registered: "
+                     + connectionId);
+      }
+      SessionState ss = cs.getSessionState(sessionId);
+      if (ss == null)
+      {
+         throw new IllegalStateException(
+               this.server
+                     + " Cannot add a consumer to a session that had not been 
registered: "
+                     + sessionId);
+      }
+      // Avoid replaying dup commands
+      if (!ss.getConsumerIds().contains(info.getConsumerId()))
+      {
+         ActiveMQDestination destination = info.getDestination();
+         if (destination != null
+               && !AdvisorySupport.isAdvisoryTopic(destination))
+         {
+            if (theConn.getConsumerCount(connectionId) >= theConn
+                  .getMaximumConsumersAllowedPerConnection())
+            {
+               throw new IllegalStateException(
+                     "Can't add consumer on connection " + connectionId
+                           + ": at maximum limit: "
+                           + 
theConn.getMaximumConsumersAllowedPerConnection());
+            }
+         }
+
+         AMQSession amqSession = sessions.get(sessionId);
+         if (amqSession == null)
+         {
+            throw new IllegalStateException("Session not exist! : " + 
sessionId);
+         }
+
+         amqSession.createConsumer(info);
+
+         try
+         {
+            ss.addConsumer(info);
+            theConn.addConsumerBrokerExchange(info.getConsumerId());
+         }
+         catch (IllegalStateException e)
+         {
+            amqSession.removeConsumer(info);
+         }
+      }
+   }
+
+   public void addSessions(OpenWireConnection theConn, Set<SessionId> 
sessionSet)
+   {
+      Iterator<SessionId> iter = sessionSet.iterator();
+      while (iter.hasNext())
+      {
+         SessionId sid = iter.next();
+         addSession(theConn, theConn.getState().getSessionState(sid).getInfo(),
+               true);
+      }
+   }
+
+   public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss)
+   {
+      return addSession(theConn, ss, false);
+   }
+
+   public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss,
+         boolean internal)
+   {
+      AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss,
+            server, theConn, this);
+      amqSession.initialize();
+      amqSession.setInternal(internal);
+      sessions.put(ss.getSessionId(), amqSession);
+      return amqSession;
+   }
+
+   public void removeConnection(AMQConnectionContext context,
+         ConnectionInfo info, Throwable error)
+   {
+      // todo roll back tx
+      this.connections.remove(context.getConnection());
+      this.connectionInfos.remove(info.getConnectionId());
+      String clientId = info.getClientId();
+      if (clientId != null)
+      {
+         this.clientIdSet.remove(clientId);
+      }
+   }
+
+   public void removeSession(AMQConnectionContext context, SessionInfo info) 
throws Exception
+   {
+      AMQSession session = sessions.remove(info.getSessionId());
+      if (session != null)
+      {
+         session.close();
+      }
+   }
+
+   public void removeConsumer(AMQConnectionContext context, ConsumerInfo info) 
throws Exception
+   {
+      SessionId sessionId = info.getConsumerId().getParentId();
+      AMQSession session = sessions.get(sessionId);
+      session.removeConsumer(info);
+   }
+
+   public void removeProducer(ProducerId id)
+   {
+      SessionId sessionId = id.getParentId();
+      AMQSession session = sessions.get(sessionId);
+      session.removeProducer(id);
+   }
+
+   public AMQPersistenceAdapter getPersistenceAdapter()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   public AMQSession getSession(SessionId sessionId)
+   {
+      return sessions.get(sessionId);
+   }
+
+   public void addDestination(OpenWireConnection connection,
+         DestinationInfo info) throws Exception
+   {
+      ActiveMQDestination dest = info.getDestination();
+      if (dest.isQueue())
+      {
+         SimpleString qName = new SimpleString("jms.queue."
+               + dest.getPhysicalName());
+         ConnectionState state = 
connection.brokerConnectionStates.get(info.getConnectionId());
+         ConnectionInfo connInfo = state.getInfo();
+         if (connInfo != null)
+         {
+            String user = connInfo.getUserName();
+            String pass = connInfo.getPassword();
+
+            AMQServerSession fakeSession = new AMQServerSession(user, pass);
+            CheckType checkType = dest.isTemporary() ? 
CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
+            ((HornetQServerImpl)server).getSecurityStore().check(qName, 
checkType, fakeSession);
+         }
+         this.server.createQueue(qName, qName, null, false, true);
+         if (dest.isTemporary())
+         {
+            connection.registerTempQueue(qName);
+         }
+      }
+
+      if (!AdvisorySupport.isAdvisoryTopic(dest))
+      {
+         AMQConnectionContext context = connection.getConext();
+         DestinationInfo advInfo = new 
DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, 
dest);
+
+         ActiveMQTopic topic = 
AdvisorySupport.getDestinationAdvisoryTopic(dest);
+         fireAdvisory(context, topic, advInfo);
+      }
+   }
+
+   public void deleteQueue(String q) throws Exception
+   {
+      server.destroyQueue(new SimpleString(q));
+   }
+
+   public void commitTransactionOnePhase(TransactionInfo info) throws Exception
+   {
+      AMQSession txSession = transactions.get(info.getTransactionId());
+
+      if (txSession != null)
+      {
+         txSession.commitOnePhase(info);
+      }
+      transactions.remove(info.getTransactionId());
+   }
+
+   public void prepareTransaction(TransactionInfo info) throws Exception
+   {
+      XATransactionId xid = (XATransactionId) info.getTransactionId();
+      AMQSession txSession = transactions.get(xid);
+      if (txSession != null)
+      {
+         txSession.prepareTransaction(xid);
+      }
+   }
+
+   public void commitTransactionTwoPhase(TransactionInfo info) throws Exception
+   {
+      XATransactionId xid = (XATransactionId) info.getTransactionId();
+      AMQSession txSession = transactions.get(xid);
+      if (txSession != null)
+      {
+         txSession.commitTwoPhase(xid);
+      }
+      transactions.remove(xid);
+   }
+
+   public void rollbackTransaction(TransactionInfo info) throws Exception
+   {
+      AMQSession txSession = transactions.get(info.getTransactionId());
+      if (txSession != null)
+      {
+         txSession.rollback(info);
+      }
+      transactions.remove(info.getTransactionId());
+   }
+
+   public TransactionId[] recoverTransactions(Set<SessionId> sIds)
+   {
+      List<TransactionId> recovered = new ArrayList<TransactionId>();
+      if (sIds != null)
+      {
+         for (SessionId sid : sIds)
+         {
+            AMQSession s = this.sessions.get(sid);
+            if (s != null)
+            {
+               s.recover(recovered);
+            }
+         }
+      }
+      return recovered.toArray(new TransactionId[0]);
+   }
+
+   public boolean validateUser(String login, String passcode)
+   {
+      boolean validated = true;
+
+      HornetQSecurityManager sm = server.getSecurityManager();
+
+      if (sm != null && server.getConfiguration().isSecurityEnabled())
+      {
+         validated = sm.validateUser(login, passcode);
+      }
+
+      return validated;
+   }
+
+   public void forgetTransaction(TransactionId xid) throws Exception
+   {
+      AMQSession txSession = transactions.get(xid);
+      if (txSession != null)
+      {
+         txSession.forget(xid);
+      }
+      transactions.remove(xid);
+   }
+
+   public void registerTx(TransactionId txId, AMQSession amqSession)
+   {
+      transactions.put(txId, amqSession);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
new file mode 100644
index 0000000..f8542bc
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireProtocolManagerFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A OpenWireProtocolManagerFactory
+ *
+ * @author <a href="mailto:[email protected]";>Howard Gao</a>
+ *
+ *
+ */
+public class OpenWireProtocolManagerFactory implements ProtocolManagerFactory
+{
+   public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
+
+   private static String[] SUPPORTED_PROTOCOLS = {OPENWIRE_PROTOCOL_NAME};
+
+   public ProtocolManager createProtocolManager(final HornetQServer server, 
final List<Interceptor> incomingInterceptors, List<Interceptor> 
outgoingInterceptors)
+   {
+      return new OpenWireProtocolManager(server);
+   }
+
+   @Override
+   public String[] getProtocols()
+   {
+      return SUPPORTED_PROTOCOLS;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java
new file mode 100644
index 0000000..8662dc1
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/OpenWireUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire;
+
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.ByteSequence;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+
+public class OpenWireUtil
+{
+
+   public static HornetQBuffer toHornetQBuffer(ByteSequence bytes)
+   {
+      HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bytes.length);
+
+      buffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+      return buffer;
+   }
+
+
+   public static SimpleString toCoreAddress(ActiveMQDestination dest)
+   {
+      if (dest.isQueue())
+      {
+         return new SimpleString("jms.queue." + dest.getPhysicalName());
+      }
+      else
+      {
+         return new SimpleString("jms.topic." + dest.getPhysicalName());
+      }
+   }
+
+   /*
+    *This util converts amq wildcards to compatible core wildcards
+    *The conversion is like this:
+    *AMQ * wildcard --> Core * wildcard (no conversion)
+    *AMQ > wildcard --> Core # wildcard
+    */
+   public static String convertWildcard(String physicalName)
+   {
+      return physicalName.replaceAll("(\\.>)+", ".#");
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java
new file mode 100644
index 0000000..158bc62
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/SendingResult.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+
+/**
+ * @author <a href="mailto:[email protected]";>Howard Gao</a>
+ */
+public class SendingResult
+{
+   private boolean blockNextSend;
+   private PagingStoreImpl blockPagingStore;
+   private SimpleString blockingAddress;
+
+   public void setBlockNextSend(boolean block)
+   {
+      this.blockNextSend = block;
+   }
+
+   public boolean isBlockNextSend()
+   {
+      return this.blockNextSend;
+   }
+
+   public void setBlockPagingStore(PagingStoreImpl store)
+   {
+      this.blockPagingStore = store;
+   }
+
+   public PagingStoreImpl getBlockPagingStore()
+   {
+      return this.blockPagingStore;
+   }
+
+   public void setBlockingAddress(SimpleString address)
+   {
+      this.blockingAddress = address;
+   }
+
+   public SimpleString getBlockingAddress()
+   {
+      return this.blockingAddress;
+   }
+
+   public boolean isSendFailIfNoSpace()
+   {
+      AddressFullMessagePolicy policy = 
this.blockPagingStore.getAddressFullMessagePolicy();
+      return policy == AddressFullMessagePolicy.FAIL;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java
new file mode 100644
index 0000000..29c490f
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQAbstractDeadLetterStrategy.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.command.Message;
+
+public abstract class AMQAbstractDeadLetterStrategy implements 
AMQDeadLetterStrategy
+{
+   private boolean processNonPersistent = false;
+   private boolean processExpired = true;
+   private boolean enableAudit = true;
+   private final ActiveMQMessageAudit messageAudit = new 
ActiveMQMessageAudit();
+
+   @Override
+   public void rollback(Message message)
+   {
+      if (message != null && this.enableAudit)
+      {
+         messageAudit.rollback(message);
+      }
+   }
+
+   @Override
+   public boolean isSendToDeadLetterQueue(Message message)
+   {
+      boolean result = false;
+      if (message != null)
+      {
+         result = true;
+         if (enableAudit && messageAudit.isDuplicate(message))
+         {
+            result = false;
+            // LOG.debug("Not adding duplicate to DLQ: {}, dest: {}",
+            // message.getMessageId(), message.getDestination());
+         }
+         if (!message.isPersistent() && !processNonPersistent)
+         {
+            result = false;
+         }
+         if (message.isExpired() && !processExpired)
+         {
+            result = false;
+         }
+      }
+      return result;
+   }
+
+   /**
+    * @return the processExpired
+    */
+   @Override
+   public boolean isProcessExpired()
+   {
+      return this.processExpired;
+   }
+
+   /**
+    * @param processExpired
+    *           the processExpired to set
+    */
+   @Override
+   public void setProcessExpired(boolean processExpired)
+   {
+      this.processExpired = processExpired;
+   }
+
+   /**
+    * @return the processNonPersistent
+    */
+   @Override
+   public boolean isProcessNonPersistent()
+   {
+      return this.processNonPersistent;
+   }
+
+   /**
+    * @param processNonPersistent
+    *           the processNonPersistent to set
+    */
+   @Override
+   public void setProcessNonPersistent(boolean processNonPersistent)
+   {
+      this.processNonPersistent = processNonPersistent;
+   }
+
+   public boolean isEnableAudit()
+   {
+      return enableAudit;
+   }
+
+   public void setEnableAudit(boolean enableAudit)
+   {
+      this.enableAudit = enableAudit;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java
new file mode 100644
index 0000000..22eb20e
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQBrokerStoppedException.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+public class AMQBrokerStoppedException extends IllegalStateException
+{
+
+   private static final long serialVersionUID = -7543507221414251115L;
+
+   public AMQBrokerStoppedException()
+   {
+      super();
+   }
+
+   public AMQBrokerStoppedException(String message, Throwable cause)
+   {
+      super(message);
+      initCause(cause);
+   }
+
+   public AMQBrokerStoppedException(String s)
+   {
+      super(s);
+   }
+
+   public AMQBrokerStoppedException(Throwable cause)
+   {
+      initCause(cause);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java
new file mode 100644
index 0000000..435d3b9
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectionContext.java
@@ -0,0 +1,393 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.state.ConnectionState;
+import org.hornetq.core.protocol.openwire.OpenWireConnection;
+import org.hornetq.core.protocol.openwire.OpenWireProtocolManager;
+
+public class AMQConnectionContext
+{
+   private OpenWireConnection connection;
+   private AMQConnector connector;
+   private OpenWireProtocolManager broker; //use protocol manager to represent 
the broker
+   private boolean inRecoveryMode;
+   private AMQTransaction transaction;
+   private ConcurrentHashMap<TransactionId, AMQTransaction> transactions;
+   private AMQSecurityContext securityContext;
+   private ConnectionId connectionId;
+   private String clientId;
+   private String userName;
+   private boolean reconnect;
+   private WireFormatInfo wireFormatInfo;
+   private Object longTermStoreContext;
+   private boolean producerFlowControl = true;
+   private AMQMessageAuthorizationPolicy messageAuthorizationPolicy;
+   private boolean networkConnection;
+   private boolean faultTolerant;
+   private final AtomicBoolean stopping = new AtomicBoolean();
+   private final MessageEvaluationContext messageEvaluationContext;
+   private boolean dontSendReponse;
+   private boolean clientMaster = true;
+   private ConnectionState connectionState;
+   private XATransactionId xid;
+
+   public AMQConnectionContext()
+   {
+      this.messageEvaluationContext = new MessageEvaluationContext();
+   }
+
+   public AMQConnectionContext(MessageEvaluationContext 
messageEvaluationContext)
+   {
+      this.messageEvaluationContext = messageEvaluationContext;
+   }
+
+   public AMQConnectionContext(ConnectionInfo info)
+   {
+      this();
+      setClientId(info.getClientId());
+      setUserName(info.getUserName());
+      setConnectionId(info.getConnectionId());
+   }
+
+   public AMQConnectionContext copy()
+   {
+      AMQConnectionContext rc = new AMQConnectionContext(
+            this.messageEvaluationContext);
+      rc.connection = this.connection;
+      rc.connector = this.connector;
+      rc.broker = this.broker;
+      rc.inRecoveryMode = this.inRecoveryMode;
+      rc.transaction = this.transaction;
+      rc.transactions = this.transactions;
+      rc.securityContext = this.securityContext;
+      rc.connectionId = this.connectionId;
+      rc.clientId = this.clientId;
+      rc.userName = this.userName;
+      rc.reconnect = this.reconnect;
+      rc.wireFormatInfo = this.wireFormatInfo;
+      rc.longTermStoreContext = this.longTermStoreContext;
+      rc.producerFlowControl = this.producerFlowControl;
+      rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy;
+      rc.networkConnection = this.networkConnection;
+      rc.faultTolerant = this.faultTolerant;
+      rc.stopping.set(this.stopping.get());
+      rc.dontSendReponse = this.dontSendReponse;
+      rc.clientMaster = this.clientMaster;
+      return rc;
+   }
+
+   public AMQSecurityContext getSecurityContext()
+   {
+      return securityContext;
+   }
+
+   public void setSecurityContext(AMQSecurityContext subject)
+   {
+      this.securityContext = subject;
+      if (subject != null)
+      {
+         setUserName(subject.getUserName());
+      }
+      else
+      {
+         setUserName(null);
+      }
+   }
+
+   /**
+    * @return the broker being used.
+    */
+   public OpenWireProtocolManager getBroker()
+   {
+      return broker;
+   }
+
+   /**
+    * @param broker
+    *           being used
+    */
+   public void setBroker(OpenWireProtocolManager broker)
+   {
+      this.broker = broker;
+   }
+
+   /**
+    * @return the connection being used
+    */
+   public OpenWireConnection getConnection()
+   {
+      return connection;
+   }
+
+   /**
+    * @param connection
+    *           being used
+    */
+   public void setConnection(OpenWireConnection connection)
+   {
+      this.connection = connection;
+   }
+
+   /**
+    * @return the transaction being used.
+    */
+   public AMQTransaction getTransaction()
+   {
+      return transaction;
+   }
+
+   /**
+    * @param transaction
+    *           being used.
+    */
+   public void setTransaction(AMQTransaction transaction)
+   {
+      this.transaction = transaction;
+   }
+
+   /**
+    * @return the connector being used.
+    */
+   public AMQConnector getConnector()
+   {
+      return connector;
+   }
+
+   /**
+    * @param connector
+    *           being used.
+    */
+   public void setConnector(AMQConnector connector)
+   {
+      this.connector = connector;
+   }
+
+   public AMQMessageAuthorizationPolicy getMessageAuthorizationPolicy()
+   {
+      return messageAuthorizationPolicy;
+   }
+
+   /**
+    * Sets the policy used to decide if the current connection is authorized to
+    * consume a given message
+    */
+   public void setMessageAuthorizationPolicy(
+         AMQMessageAuthorizationPolicy messageAuthorizationPolicy)
+   {
+      this.messageAuthorizationPolicy = messageAuthorizationPolicy;
+   }
+
+   /**
+    * @return
+    */
+   public boolean isInRecoveryMode()
+   {
+      return inRecoveryMode;
+   }
+
+   public void setInRecoveryMode(boolean inRecoveryMode)
+   {
+      this.inRecoveryMode = inRecoveryMode;
+   }
+
+   public ConcurrentHashMap<TransactionId, AMQTransaction> getTransactions()
+   {
+      return transactions;
+   }
+
+   public void setTransactions(
+         ConcurrentHashMap<TransactionId, AMQTransaction> transactions)
+   {
+      this.transactions = transactions;
+   }
+
+   public boolean isInTransaction()
+   {
+      return transaction != null;
+   }
+
+   public String getClientId()
+   {
+      return clientId;
+   }
+
+   public void setClientId(String clientId)
+   {
+      this.clientId = clientId;
+   }
+
+   public boolean isReconnect()
+   {
+      return reconnect;
+   }
+
+   public void setReconnect(boolean reconnect)
+   {
+      this.reconnect = reconnect;
+   }
+
+   public WireFormatInfo getWireFormatInfo()
+   {
+      return wireFormatInfo;
+   }
+
+   public void setWireFormatInfo(WireFormatInfo wireFormatInfo)
+   {
+      this.wireFormatInfo = wireFormatInfo;
+   }
+
+   public ConnectionId getConnectionId()
+   {
+      return connectionId;
+   }
+
+   public void setConnectionId(ConnectionId connectionId)
+   {
+      this.connectionId = connectionId;
+   }
+
+   public String getUserName()
+   {
+      return userName;
+   }
+
+   public void setUserName(String userName)
+   {
+      this.userName = userName;
+   }
+
+   public MessageEvaluationContext getMessageEvaluationContext()
+   {
+      return messageEvaluationContext;
+   }
+
+   public Object getLongTermStoreContext()
+   {
+      return longTermStoreContext;
+   }
+
+   public void setLongTermStoreContext(Object longTermStoreContext)
+   {
+      this.longTermStoreContext = longTermStoreContext;
+   }
+
+   public boolean isProducerFlowControl()
+   {
+      return producerFlowControl;
+   }
+
+   public void setProducerFlowControl(boolean disableProducerFlowControl)
+   {
+      this.producerFlowControl = disableProducerFlowControl;
+   }
+
+   public boolean isAllowedToConsume(MessageReference n) throws IOException
+   {
+      if (messageAuthorizationPolicy != null)
+      {
+         return messageAuthorizationPolicy.isAllowedToConsume(this,
+               n.getMessage());
+      }
+      return true;
+   }
+
+   public synchronized boolean isNetworkConnection()
+   {
+      return networkConnection;
+   }
+
+   public synchronized void setNetworkConnection(boolean networkConnection)
+   {
+      this.networkConnection = networkConnection;
+   }
+
+   public AtomicBoolean getStopping()
+   {
+      return stopping;
+   }
+
+   public void setDontSendReponse(boolean b)
+   {
+      this.dontSendReponse = b;
+   }
+
+   public boolean isDontSendReponse()
+   {
+      return dontSendReponse;
+   }
+
+   /**
+    * @return the clientMaster
+    */
+   public boolean isClientMaster()
+   {
+      return this.clientMaster;
+   }
+
+   /**
+    * @param clientMaster
+    *           the clientMaster to set
+    */
+   public void setClientMaster(boolean clientMaster)
+   {
+      this.clientMaster = clientMaster;
+   }
+
+   public boolean isFaultTolerant()
+   {
+      return faultTolerant;
+   }
+
+   public void setFaultTolerant(boolean faultTolerant)
+   {
+      this.faultTolerant = faultTolerant;
+   }
+
+   public void setConnectionState(ConnectionState connectionState)
+   {
+      this.connectionState = connectionState;
+   }
+
+   public ConnectionState getConnectionState()
+   {
+      return this.connectionState;
+   }
+
+   public void setXid(XATransactionId id)
+   {
+      this.xid = id;
+   }
+
+   public XATransactionId getXid()
+   {
+      return xid;
+   }
+
+   public boolean isAllowLinkStealing()
+   {
+      return connector != null && connector.isAllowLinkStealing();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java
new file mode 100644
index 0000000..e256736
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnector.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
+import org.hornetq.core.protocol.openwire.OpenWireConnection;
+
+public interface AMQConnector
+{
+   /**
+    * @return brokerInfo
+    */
+   BrokerInfo getBrokerInfo();
+
+   /**
+    * @return the statistics for this connector
+    */
+   AMQConnectorStatistics getStatistics();
+
+   /**
+    * @return true if update client connections when brokers leave/join a
+    *         cluster
+    */
+   boolean isUpdateClusterClients();
+
+   /**
+    * @return true if clients should be re-balanced across the cluster
+    */
+   boolean isRebalanceClusterClients();
+
+   /**
+    * Update all the connections with information about the connected brokers 
in
+    * the cluster
+    */
+   void updateClientClusterInfo();
+
+   /**
+    * @return true if clients should be updated when a broker is removed from a
+    *         broker
+    */
+   boolean isUpdateClusterClientsOnRemove();
+
+   int connectionCount();
+
+   /**
+    * If enabled, older connections with the same clientID are stopped
+    *
+    * @return true/false if link stealing is enabled
+    */
+   boolean isAllowLinkStealing();
+
+   //see TransportConnector
+   ConnectionControl getConnectionControl();
+
+   void onStarted(OpenWireConnection connection);
+
+   void onStopped(OpenWireConnection connection);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java
new file mode 100644
index 0000000..ffe477b
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConnectorStatistics.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.PollCountStatisticImpl;
+import org.apache.activemq.management.StatsImpl;
+
+public class AMQConnectorStatistics extends StatsImpl
+{
+
+   protected CountStatisticImpl enqueues;
+   protected CountStatisticImpl dequeues;
+   protected CountStatisticImpl consumers;
+   protected CountStatisticImpl messages;
+   protected PollCountStatisticImpl messagesCached;
+
+   public AMQConnectorStatistics()
+   {
+
+      enqueues = new CountStatisticImpl("enqueues",
+            "The number of messages that have been sent to the destination");
+      dequeues = new CountStatisticImpl("dequeues",
+            "The number of messages that have been dispatched from the 
destination");
+      consumers = new CountStatisticImpl(
+            "consumers",
+            "The number of consumers that that are subscribing to messages 
from the destination");
+      messages = new CountStatisticImpl("messages",
+            "The number of messages that that are being held by the 
destination");
+      messagesCached = new PollCountStatisticImpl("messagesCached",
+            "The number of messages that are held in the destination's memory 
cache");
+
+      addStatistic("enqueues", enqueues);
+      addStatistic("dequeues", dequeues);
+      addStatistic("consumers", consumers);
+      addStatistic("messages", messages);
+      addStatistic("messagesCached", messagesCached);
+   }
+
+   public CountStatisticImpl getEnqueues()
+   {
+      return enqueues;
+   }
+
+   public CountStatisticImpl getDequeues()
+   {
+      return dequeues;
+   }
+
+   public CountStatisticImpl getConsumers()
+   {
+      return consumers;
+   }
+
+   public PollCountStatisticImpl getMessagesCached()
+   {
+      return messagesCached;
+   }
+
+   public CountStatisticImpl getMessages()
+   {
+      return messages;
+   }
+
+   public void reset()
+   {
+      super.reset();
+      enqueues.reset();
+      dequeues.reset();
+   }
+
+   public void setEnabled(boolean enabled)
+   {
+      super.setEnabled(enabled);
+      enqueues.setEnabled(enabled);
+      dequeues.setEnabled(enabled);
+      consumers.setEnabled(enabled);
+      messages.setEnabled(enabled);
+      messagesCached.setEnabled(enabled);
+   }
+
+   public void setParent(AMQConnectorStatistics parent)
+   {
+      if (parent != null)
+      {
+         enqueues.setParent(parent.enqueues);
+         dequeues.setParent(parent.dequeues);
+         consumers.setParent(parent.consumers);
+         messagesCached.setParent(parent.messagesCached);
+         messages.setParent(parent.messages);
+      }
+      else
+      {
+         enqueues.setParent(null);
+         dequeues.setParent(null);
+         consumers.setParent(null);
+         messagesCached.setParent(null);
+         messages.setParent(null);
+      }
+   }
+
+   public void setMessagesCached(PollCountStatisticImpl messagesCached)
+   {
+      this.messagesCached = messagesCached;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java
new file mode 100644
index 0000000..60e1ec2
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumer.java
@@ -0,0 +1,390 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.wireformat.WireFormat;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.openwire.OpenWireMessageConverter;
+import org.hornetq.core.protocol.openwire.OpenWireUtil;
+import org.hornetq.core.server.QueueQueryResult;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.jms.client.HornetQDestination;
+
+public class AMQConsumer implements BrowserListener
+{
+   private AMQSession session;
+   private ActiveMQDestination actualDest;
+   private ConsumerInfo info;
+   private long nativeId = -1;
+   private SimpleString subQueueName = null;
+
+   private final int prefetchSize;
+   private AtomicInteger currentSize;
+   private final java.util.Queue<MessageInfo> deliveringRefs = new 
ConcurrentLinkedQueue<MessageInfo>();
+
+   public AMQConsumer(AMQSession amqSession, ActiveMQDestination d, 
ConsumerInfo info)
+   {
+      this.session = amqSession;
+      this.actualDest = d;
+      this.info = info;
+      this.prefetchSize = info.getPrefetchSize();
+      this.currentSize = new AtomicInteger(0);
+   }
+
+   public void init() throws Exception
+   {
+      AMQServerSession coreSession = session.getCoreSession();
+
+      SimpleString selector = info.getSelector() == null ? null : new 
SimpleString(info.getSelector());
+
+      nativeId = session.getCoreServer().getStorageManager().generateID();
+
+      SimpleString address = new 
SimpleString(this.actualDest.getPhysicalName());
+
+      if (this.actualDest.isTopic())
+      {
+         String physicalName = this.actualDest.getPhysicalName();
+         if (physicalName.contains(".>"))
+         {
+            //wildcard
+            physicalName = OpenWireUtil.convertWildcard(physicalName);
+         }
+
+         // on recreate we don't need to create queues
+         address = new SimpleString("jms.topic." + physicalName);
+         if (info.isDurable())
+         {
+            subQueueName = new SimpleString(
+                  HornetQDestination.createQueueNameForDurableSubscription(
+                        true, info.getClientId(), info.getSubscriptionName()));
+
+            QueueQueryResult result = 
coreSession.executeQueueQuery(subQueueName);
+            if (result.isExists())
+            {
+               // Already exists
+               if (result.getConsumerCount() > 0)
+               {
+                  throw new IllegalStateException(
+                        "Cannot create a subscriber on the durable 
subscription since it already has subscriber(s)");
+               }
+
+               SimpleString oldFilterString = result.getFilterString();
+
+               boolean selectorChanged = selector == null
+                     && oldFilterString != null || oldFilterString == null
+                     && selector != null || oldFilterString != null
+                     && selector != null && !oldFilterString.equals(selector);
+
+               SimpleString oldTopicName = result.getAddress();
+
+               boolean topicChanged = !oldTopicName.equals(address);
+
+               if (selectorChanged || topicChanged)
+               {
+                  // Delete the old durable sub
+                  coreSession.deleteQueue(subQueueName);
+
+                  // Create the new one
+                  coreSession.createQueue(address, subQueueName, selector,
+                        false, true);
+               }
+
+            }
+            else
+            {
+               coreSession.createQueue(address, subQueueName, selector, false,
+                     true);
+            }
+         }
+         else
+         {
+            subQueueName = new SimpleString(UUID.randomUUID().toString());
+
+            coreSession.createQueue(address, subQueueName, selector, true, 
false);
+         }
+
+         coreSession.createConsumer(nativeId, subQueueName, null, 
info.isBrowser(), false, Integer.MAX_VALUE);
+      }
+      else
+      {
+         SimpleString queueName = new SimpleString("jms.queue." + 
this.actualDest.getPhysicalName());
+         coreSession.createConsumer(nativeId, queueName, selector, 
info.isBrowser(), false, Integer.MAX_VALUE);
+      }
+
+      if (info.isBrowser())
+      {
+         AMQServerConsumer coreConsumer = coreSession.getConsumer(nativeId);
+         coreConsumer.setBrowserListener(this);
+      }
+
+   }
+
+   public long getNativeId()
+   {
+      return this.nativeId;
+   }
+
+   public ConsumerId getId()
+   {
+      return info.getConsumerId();
+   }
+
+   public WireFormat getMarshaller()
+   {
+      return this.session.getMarshaller();
+   }
+
+   public void acquireCredit(int n) throws Exception
+   {
+      this.currentSize.addAndGet(-n);
+      if (currentSize.get() < prefetchSize)
+      {
+         AtomicInteger credits = 
session.getCoreSession().getConsumerCredits(nativeId);
+         credits.set(0);
+         session.getCoreSession().receiveConsumerCredits(nativeId, 
Integer.MAX_VALUE);
+      }
+   }
+
+   public void checkCreditOnDelivery() throws Exception
+   {
+      this.currentSize.incrementAndGet();
+
+      if (currentSize.get() == prefetchSize)
+      {
+         //stop because reach prefetchSize
+         session.getCoreSession().receiveConsumerCredits(nativeId, 0);
+      }
+   }
+
+   public int handleDeliver(ServerMessage message, int deliveryCount)
+   {
+      MessageDispatch dispatch;
+      try
+      {
+         //decrement deliveryCount as AMQ client tends to add 1.
+         dispatch = OpenWireMessageConverter.createMessageDispatch(message, 
deliveryCount - 1, this);
+         int size = dispatch.getMessage().getSize();
+         this.deliveringRefs.add(new 
MessageInfo(dispatch.getMessage().getMessageId(), message.getMessageID(), 
size));
+         session.deliverMessage(dispatch);
+         checkCreditOnDelivery();
+         return size;
+      }
+      catch (IOException e)
+      {
+         return 0;
+      }
+      catch (Throwable t)
+      {
+         return 0;
+      }
+   }
+
+   public void acknowledge(MessageAck ack) throws Exception
+   {
+      MessageId first = ack.getFirstMessageId();
+      MessageId lastm = ack.getLastMessageId();
+      TransactionId tid = ack.getTransactionId();
+      boolean isLocalTx = (tid != null) && tid.isLocalTransaction();
+      boolean single = lastm.equals(first);
+
+      MessageInfo mi = null;
+      int n = 0;
+
+      if (ack.isIndividualAck())
+      {
+         Iterator<MessageInfo> iter = deliveringRefs.iterator();
+         while (iter.hasNext())
+         {
+            mi = iter.next();
+            if (mi.amqId.equals(lastm))
+            {
+               n++;
+               iter.remove();
+               session.getCoreSession().individualAcknowledge(nativeId, 
mi.nativeId);
+               session.getCoreSession().commit();
+               break;
+            }
+         }
+      }
+      else if (ack.isRedeliveredAck())
+      {
+         //client tells that this message is for redlivery.
+         //do nothing until poisoned.
+         n = 1;
+      }
+      else if (ack.isPoisonAck())
+      {
+         //send to dlq
+         Iterator<MessageInfo> iter = deliveringRefs.iterator();
+         boolean firstFound = false;
+         while (iter.hasNext())
+         {
+            mi = iter.next();
+            if (mi.amqId.equals(first))
+            {
+               n++;
+               iter.remove();
+               session.getCoreSession().moveToDeadLetterAddress(nativeId, 
mi.nativeId, ack.getPoisonCause());
+               session.getCoreSession().commit();
+               if (single)
+               {
+                  break;
+               }
+               firstFound = true;
+            }
+            else if (firstFound || first == null)
+            {
+               n++;
+               iter.remove();
+               session.getCoreSession().moveToDeadLetterAddress(nativeId, 
mi.nativeId, ack.getPoisonCause());
+               session.getCoreSession().commit();
+               if (mi.amqId.equals(lastm))
+               {
+                  break;
+               }
+            }
+         }
+      }
+      else if (ack.isDeliveredAck() || ack.isExpiredAck())
+      {
+         //ToDo: implement with tests
+         n = 1;
+      }
+      else
+      {
+         Iterator<MessageInfo> iter = deliveringRefs.iterator();
+         boolean firstFound = false;
+         while (iter.hasNext())
+         {
+            MessageInfo ami = iter.next();
+            if (ami.amqId.equals(first))
+            {
+               n++;
+               if (!isLocalTx)
+               {
+                  iter.remove();
+               }
+               else
+               {
+                  ami.setLocalAcked(true);
+               }
+               if (single)
+               {
+                  mi = ami;
+                  break;
+               }
+               firstFound = true;
+            }
+            else if (firstFound || first == null)
+            {
+               n++;
+               if (!isLocalTx)
+               {
+                  iter.remove();
+               }
+               else
+               {
+                  ami.setLocalAcked(true);
+               }
+               if (ami.amqId.equals(lastm))
+               {
+                  mi = ami;
+                  break;
+               }
+            }
+         }
+         if (mi != null && !isLocalTx)
+         {
+            session.getCoreSession().acknowledge(nativeId, mi.nativeId);
+         }
+      }
+
+      acquireCredit(n);
+   }
+
+   @Override
+   public void browseFinished()
+   {
+      MessageDispatch md = new MessageDispatch();
+      md.setConsumerId(info.getConsumerId());
+      md.setMessage(null);
+      md.setDestination(null);
+
+      session.deliverMessage(md);
+   }
+
+   public boolean handledTransactionalMsg()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   //this is called before session commit a local tx
+   public void finishTx() throws Exception
+   {
+      MessageInfo lastMi = null;
+
+      MessageInfo mi = null;
+      Iterator<MessageInfo> iter = deliveringRefs.iterator();
+      while (iter.hasNext())
+      {
+         mi = iter.next();
+         if (mi.isLocalAcked())
+         {
+            iter.remove();
+            lastMi = mi;
+         }
+      }
+
+      if (lastMi != null)
+      {
+         session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
+      }
+   }
+
+   public void rollbackTx(Set<Long> acked) throws Exception
+   {
+      MessageInfo lastMi = null;
+
+      MessageInfo mi = null;
+      Iterator<MessageInfo> iter = deliveringRefs.iterator();
+      while (iter.hasNext())
+      {
+         mi = iter.next();
+         if (mi.isLocalAcked())
+         {
+            acked.add(mi.nativeId);
+            lastMi = mi;
+         }
+      }
+
+      if (lastMi != null)
+      {
+         session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
new file mode 100644
index 0000000..4c8c29e
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+public class AMQConsumerBrokerExchange
+{
+   private AMQConnectionContext connectionContext;
+   private AMQDestination regionDestination;
+   private AMQSubscription subscription;
+   private boolean wildcard;
+
+   /**
+    * @return the connectionContext
+    */
+   public AMQConnectionContext getConnectionContext()
+   {
+      return this.connectionContext;
+   }
+
+   /**
+    * @param connectionContext
+    *           the connectionContext to set
+    */
+   public void setConnectionContext(AMQConnectionContext connectionContext)
+   {
+      this.connectionContext = connectionContext;
+   }
+
+   /**
+    * @return the regionDestination
+    */
+   public AMQDestination getRegionDestination()
+   {
+      return this.regionDestination;
+   }
+
+   /**
+    * @param regionDestination
+    *           the regionDestination to set
+    */
+   public void setRegionDestination(AMQDestination regionDestination)
+   {
+      this.regionDestination = regionDestination;
+   }
+
+   /**
+    * @return the subscription
+    */
+   public AMQSubscription getSubscription()
+   {
+      return this.subscription;
+   }
+
+   /**
+    * @param subscription
+    *           the subscription to set
+    */
+   public void setSubscription(AMQSubscription subscription)
+   {
+      this.subscription = subscription;
+   }
+
+   /**
+    * @return the wildcard
+    */
+   public boolean isWildcard()
+   {
+      return this.wildcard;
+   }
+
+   /**
+    * @param wildcard
+    *           the wildcard to set
+    */
+   public void setWildcard(boolean wildcard)
+   {
+      this.wildcard = wildcard;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java
new file mode 100644
index 0000000..7df132d
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDeadLetterStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+
+public interface AMQDeadLetterStrategy
+{
+
+   /**
+    * Allow pluggable strategy for deciding if message should be sent to a 
dead letter queue
+    * for example, you might not want to ignore expired or non-persistent 
messages
+    * @param message
+    * @return true if message should be sent to a dead letter queue
+    */
+   boolean isSendToDeadLetterQueue(Message message);
+
+   /**
+    * Returns the dead letter queue for the given message and subscription.
+    */
+   ActiveMQDestination getDeadLetterQueueFor(Message message, AMQSubscription 
subscription);
+
+   /**
+    * @return true if processes expired messages
+    */
+   boolean isProcessExpired();
+
+   /**
+    * @param processExpired the processExpired to set
+    */
+   void setProcessExpired(boolean processExpired);
+
+   /**
+    * @return the processNonPersistent
+    */
+   boolean isProcessNonPersistent();
+
+   /**
+    * @param processNonPersistent the processNonPersistent to set
+    */
+   void setProcessNonPersistent(boolean processNonPersistent);
+
+   boolean isDLQ(ActiveMQDestination destination);
+
+   /**
+    * Allows for a Message that was already processed by a DLQ to be rolled 
back in case
+    * of a move or a retry of that message, otherwise the Message would be 
considered a
+    * duplicate if this strategy is doing Message Auditing.
+    *
+    * @param message
+    */
+   void rollback(Message message);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java
new file mode 100644
index 0000000..b2c5ad2
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestination.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.Usage;
+
+public interface AMQDestination
+{
+   AMQDeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new 
AMQSharedDeadLetterStrategy();
+   long DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL = 30000;
+
+   void addSubscription(AMQConnectionContext context, AMQSubscription sub) 
throws Exception;
+
+   void removeSubscription(AMQConnectionContext context, AMQSubscription sub,
+         long lastDeliveredSequenceId) throws Exception;
+
+   void addProducer(AMQConnectionContext context, ProducerInfo info) throws 
Exception;
+
+   void removeProducer(AMQConnectionContext context, ProducerInfo info) throws 
Exception;
+
+   void send(AMQProducerBrokerExchange producerExchange, Message messageSend) 
throws Exception;
+
+   void acknowledge(AMQConnectionContext context, AMQSubscription sub,
+         final MessageAck ack, final MessageReference node) throws IOException;
+
+   long getInactiveTimoutBeforeGC();
+
+   void markForGC(long timeStamp);
+
+   boolean canGC();
+
+   void gc();
+
+   ActiveMQDestination getActiveMQDestination();
+
+   MemoryUsage getMemoryUsage();
+
+   void setMemoryUsage(MemoryUsage memoryUsage);
+
+   void dispose(AMQConnectionContext context) throws IOException;
+
+   boolean isDisposed();
+
+   AMQDestinationStatistics getDestinationStatistics();
+
+   AMQDeadLetterStrategy getDeadLetterStrategy();
+
+   Message[] browse();
+
+   String getName();
+
+   AMQMessageStore getMessageStore();
+
+   boolean isProducerFlowControl();
+
+   void setProducerFlowControl(boolean value);
+
+   boolean isAlwaysRetroactive();
+
+   void setAlwaysRetroactive(boolean value);
+
+   /**
+    * Set's the interval at which warnings about producers being blocked by
+    * resource usage will be triggered. Values of 0 or less will disable
+    * warnings
+    *
+    * @param blockedProducerWarningInterval
+    *           the interval at which warning about blocked producers will be
+    *           triggered.
+    */
+   void setBlockedProducerWarningInterval(long blockedProducerWarningInterval);
+
+   /**
+    *
+    * @return the interval at which warning about blocked producers will be
+    *         triggered.
+    */
+   long getBlockedProducerWarningInterval();
+
+   int getMaxProducersToAudit();
+
+   void setMaxProducersToAudit(int maxProducersToAudit);
+
+   int getMaxAuditDepth();
+
+   void setMaxAuditDepth(int maxAuditDepth);
+
+   boolean isEnableAudit();
+
+   void setEnableAudit(boolean enableAudit);
+
+   boolean isActive();
+
+   int getMaxPageSize();
+
+   void setMaxPageSize(int maxPageSize);
+
+   int getMaxBrowsePageSize();
+
+   void setMaxBrowsePageSize(int maxPageSize);
+
+   boolean isUseCache();
+
+   void setUseCache(boolean useCache);
+
+   int getMinimumMessageSize();
+
+   void setMinimumMessageSize(int minimumMessageSize);
+
+   int getCursorMemoryHighWaterMark();
+
+   void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+
+   /**
+    * optionally called by a Subscriber - to inform the Destination its ready
+    * for more messages
+    */
+   void wakeup();
+
+   /**
+    * @return true if lazyDispatch is enabled
+    */
+   boolean isLazyDispatch();
+
+   /**
+    * set the lazy dispatch - default is false
+    *
+    * @param value
+    */
+   void setLazyDispatch(boolean value);
+
+   /**
+    * Inform the Destination a message has expired
+    *
+    * @param context
+    * @param subs
+    * @param node
+    */
+   void messageExpired(AMQConnectionContext context, AMQSubscription subs,
+         MessageReference node);
+
+   /**
+    * called when message is consumed
+    *
+    * @param context
+    * @param messageReference
+    */
+   void messageConsumed(AMQConnectionContext context,
+         MessageReference messageReference);
+
+   /**
+    * Called when message is delivered to the broker
+    *
+    * @param context
+    * @param messageReference
+    */
+   void messageDelivered(AMQConnectionContext context,
+         MessageReference messageReference);
+
+   /**
+    * Called when a message is discarded - e.g. running low on memory This will
+    * happen only if the policy is enabled - e.g. non durable topics
+    *
+    * @param context
+    * @param messageReference
+    * @param sub
+    */
+   void messageDiscarded(AMQConnectionContext context, AMQSubscription sub,
+         MessageReference messageReference);
+
+   /**
+    * Called when there is a slow consumer
+    *
+    * @param context
+    * @param subs
+    */
+   void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
+
+   /**
+    * Called to notify a producer is too fast
+    *
+    * @param context
+    * @param producerInfo
+    */
+   void fastProducer(AMQConnectionContext context, ProducerInfo producerInfo);
+
+   /**
+    * Called when a Usage reaches a limit
+    *
+    * @param context
+    * @param usage
+    */
+   void isFull(AMQConnectionContext context, Usage<?> usage);
+
+   List<AMQSubscription> getConsumers();
+
+   /**
+    * called on Queues in slave mode to allow dispatch to follow subscription
+    * choice of master
+    *
+    * @param messageDispatchNotification
+    * @throws Exception
+    */
+   void processDispatchNotification(MessageDispatchNotification 
messageDispatchNotification) throws Exception;
+
+   boolean isPrioritizedMessages();
+
+   AMQSlowConsumerStrategy getSlowConsumerStrategy();
+
+   boolean isDoOptimzeMessageStorage();
+
+   void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
+
+   void clearPendingMessages();
+
+   boolean isDLQ();
+
+   void duplicateFromStore(Message message, AMQSubscription subscription);
+
+}

Reply via email to