http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java
new file mode 100644
index 0000000..035db4e
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQDestinationStatistics.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.PollCountStatisticImpl;
+import org.apache.activemq.management.SizeStatisticImpl;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.management.TimeStatisticImpl;
+
+public class AMQDestinationStatistics extends StatsImpl
+{
+
+   protected CountStatisticImpl enqueues;
+   protected CountStatisticImpl dequeues;
+   protected CountStatisticImpl consumers;
+   protected CountStatisticImpl producers;
+   protected CountStatisticImpl messages;
+   protected PollCountStatisticImpl messagesCached;
+   protected CountStatisticImpl dispatched;
+   protected CountStatisticImpl inflight;
+   protected CountStatisticImpl expired;
+   protected TimeStatisticImpl processTime;
+   protected CountStatisticImpl blockedSends;
+   protected TimeStatisticImpl blockedTime;
+   protected SizeStatisticImpl messageSize;
+
+   public AMQDestinationStatistics()
+   {
+
+      enqueues = new CountStatisticImpl("enqueues",
+            "The number of messages that have been sent to the destination");
+      dispatched = new CountStatisticImpl("dispatched",
+            "The number of messages that have been dispatched from the 
destination");
+      dequeues = new CountStatisticImpl("dequeues",
+            "The number of messages that have been acknowledged from the 
destination");
+      inflight = new CountStatisticImpl("inflight",
+            "The number of messages dispatched but awaiting acknowledgement");
+      expired = new CountStatisticImpl("expired",
+            "The number of messages that have expired");
+
+      consumers = new CountStatisticImpl(
+            "consumers",
+            "The number of consumers that that are subscribing to messages 
from the destination");
+      consumers.setDoReset(false);
+      producers = new CountStatisticImpl("producers",
+            "The number of producers that that are publishing messages to the 
destination");
+      producers.setDoReset(false);
+      messages = new CountStatisticImpl("messages",
+            "The number of messages that that are being held by the 
destination");
+      messages.setDoReset(false);
+      messagesCached = new PollCountStatisticImpl("messagesCached",
+            "The number of messages that are held in the destination's memory 
cache");
+      processTime = new TimeStatisticImpl("processTime",
+            "information around length of time messages are held by a 
destination");
+      blockedSends = new CountStatisticImpl("blockedSends",
+            "number of messages that have to wait for flow control");
+      blockedTime = new TimeStatisticImpl("blockedTime",
+            "amount of time messages are blocked for flow control");
+      messageSize = new SizeStatisticImpl("messageSize",
+            "Size of messages passing through the destination");
+      addStatistic("enqueues", enqueues);
+      addStatistic("dispatched", dispatched);
+      addStatistic("dequeues", dequeues);
+      addStatistic("inflight", inflight);
+      addStatistic("expired", expired);
+      addStatistic("consumers", consumers);
+      addStatistic("producers", producers);
+      addStatistic("messages", messages);
+      addStatistic("messagesCached", messagesCached);
+      addStatistic("processTime", processTime);
+      addStatistic("blockedSends", blockedSends);
+      addStatistic("blockedTime", blockedTime);
+      addStatistic("messageSize", messageSize);
+   }
+
+   public CountStatisticImpl getEnqueues()
+   {
+      return enqueues;
+   }
+
+   public CountStatisticImpl getDequeues()
+   {
+      return dequeues;
+   }
+
+   public CountStatisticImpl getInflight()
+   {
+      return inflight;
+   }
+
+   public CountStatisticImpl getExpired()
+   {
+      return expired;
+   }
+
+   public CountStatisticImpl getConsumers()
+   {
+      return consumers;
+   }
+
+   public CountStatisticImpl getProducers()
+   {
+      return producers;
+   }
+
+   public PollCountStatisticImpl getMessagesCached()
+   {
+      return messagesCached;
+   }
+
+   public CountStatisticImpl getMessages()
+   {
+      return messages;
+   }
+
+   public void setMessagesCached(PollCountStatisticImpl messagesCached)
+   {
+      this.messagesCached = messagesCached;
+   }
+
+   public CountStatisticImpl getDispatched()
+   {
+      return dispatched;
+   }
+
+   public TimeStatisticImpl getProcessTime()
+   {
+      return this.processTime;
+   }
+
+   public CountStatisticImpl getBlockedSends()
+   {
+      return this.blockedSends;
+   }
+
+   public TimeStatisticImpl getBlockedTime()
+   {
+      return this.blockedTime;
+   }
+
+   public SizeStatisticImpl getMessageSize()
+   {
+      return this.messageSize;
+   }
+
+   public void reset()
+   {
+      if (this.isDoReset())
+      {
+         super.reset();
+         enqueues.reset();
+         dequeues.reset();
+         dispatched.reset();
+         inflight.reset();
+         expired.reset();
+         blockedSends.reset();
+         blockedTime.reset();
+         messageSize.reset();
+      }
+   }
+
+   public void setEnabled(boolean enabled)
+   {
+      super.setEnabled(enabled);
+      enqueues.setEnabled(enabled);
+      dispatched.setEnabled(enabled);
+      dequeues.setEnabled(enabled);
+      inflight.setEnabled(enabled);
+      expired.setEnabled(true);
+      consumers.setEnabled(enabled);
+      producers.setEnabled(enabled);
+      messages.setEnabled(enabled);
+      messagesCached.setEnabled(enabled);
+      processTime.setEnabled(enabled);
+      blockedSends.setEnabled(enabled);
+      blockedTime.setEnabled(enabled);
+      messageSize.setEnabled(enabled);
+
+   }
+
+   public void setParent(AMQDestinationStatistics parent)
+   {
+      if (parent != null)
+      {
+         enqueues.setParent(parent.enqueues);
+         dispatched.setParent(parent.dispatched);
+         dequeues.setParent(parent.dequeues);
+         inflight.setParent(parent.inflight);
+         expired.setParent(parent.expired);
+         consumers.setParent(parent.consumers);
+         producers.setParent(parent.producers);
+         messagesCached.setParent(parent.messagesCached);
+         messages.setParent(parent.messages);
+         processTime.setParent(parent.processTime);
+         blockedSends.setParent(parent.blockedSends);
+         blockedTime.setParent(parent.blockedTime);
+         messageSize.setParent(parent.messageSize);
+      }
+      else
+      {
+         enqueues.setParent(null);
+         dispatched.setParent(null);
+         dequeues.setParent(null);
+         inflight.setParent(null);
+         expired.setParent(null);
+         consumers.setParent(null);
+         producers.setParent(null);
+         messagesCached.setParent(null);
+         messages.setParent(null);
+         processTime.setParent(null);
+         blockedSends.setParent(null);
+         blockedTime.setParent(null);
+         messageSize.setParent(null);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java
new file mode 100644
index 0000000..d9c4495
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMapTransportConnectionStateRegister.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+
+public class AMQMapTransportConnectionStateRegister implements
+      AMQTransportConnectionStateRegister
+{
+
+   private Map<ConnectionId, AMQTransportConnectionState> connectionStates = 
new ConcurrentHashMap<ConnectionId, AMQTransportConnectionState>();
+
+   public AMQTransportConnectionState registerConnectionState(
+         ConnectionId connectionId, AMQTransportConnectionState state)
+   {
+      AMQTransportConnectionState rc = connectionStates
+            .put(connectionId, state);
+      return rc;
+   }
+
+   public AMQTransportConnectionState unregisterConnectionState(
+         ConnectionId connectionId)
+   {
+      AMQTransportConnectionState rc = connectionStates.remove(connectionId);
+      if (rc.getReferenceCounter().get() > 1)
+      {
+         rc.decrementReference();
+         connectionStates.put(connectionId, rc);
+      }
+      return rc;
+   }
+
+   public List<AMQTransportConnectionState> listConnectionStates()
+   {
+
+      List<AMQTransportConnectionState> rc = new 
ArrayList<AMQTransportConnectionState>();
+      rc.addAll(connectionStates.values());
+      return rc;
+   }
+
+   public AMQTransportConnectionState lookupConnectionState(String 
connectionId)
+   {
+      return connectionStates.get(new ConnectionId(connectionId));
+   }
+
+   public AMQTransportConnectionState lookupConnectionState(ConsumerId id)
+   {
+      AMQTransportConnectionState cs = lookupConnectionState(id
+            .getConnectionId());
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a consumer from a connection that had not been 
registered: "
+                     + id.getParentId().getParentId());
+      }
+      return cs;
+   }
+
+   public AMQTransportConnectionState lookupConnectionState(ProducerId id)
+   {
+      AMQTransportConnectionState cs = lookupConnectionState(id
+            .getConnectionId());
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a producer from a connection that had not been 
registered: "
+                     + id.getParentId().getParentId());
+      }
+      return cs;
+   }
+
+   public AMQTransportConnectionState lookupConnectionState(SessionId id)
+   {
+      AMQTransportConnectionState cs = lookupConnectionState(id
+            .getConnectionId());
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a session from a connection that had not been 
registered: "
+                     + id.getParentId());
+      }
+      return cs;
+   }
+
+   public AMQTransportConnectionState lookupConnectionState(
+         ConnectionId connectionId)
+   {
+      AMQTransportConnectionState cs = connectionStates.get(connectionId);
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a connection that had not been registered: "
+                     + connectionId);
+      }
+      return cs;
+   }
+
+   public boolean doesHandleMultipleConnectionStates()
+   {
+      return true;
+   }
+
+   public boolean isEmpty()
+   {
+      return connectionStates.isEmpty();
+   }
+
+   public void clear()
+   {
+      connectionStates.clear();
+
+   }
+
+   public void intialize(AMQTransportConnectionStateRegister other)
+   {
+      connectionStates.clear();
+      connectionStates.putAll(other.mapStates());
+
+   }
+
+   public Map<ConnectionId, AMQTransportConnectionState> mapStates()
+   {
+      HashMap<ConnectionId, AMQTransportConnectionState> map = new 
HashMap<ConnectionId, AMQTransportConnectionState>(
+            connectionStates);
+      return map;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java
new file mode 100644
index 0000000..6cdd593
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageAuthorizationPolicy.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.Message;
+
+public interface AMQMessageAuthorizationPolicy
+{
+
+   /**
+    * Returns true if the given message is able to be dispatched to the 
connection
+    * performing any user
+    *
+    * @return true if the context is allowed to consume the message
+    */
+   boolean isAllowedToConsume(AMQConnectionContext context, Message message);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java
new file mode 100644
index 0000000..df62d62
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQMessageStore.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+public interface AMQMessageStore
+{
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java
new file mode 100644
index 0000000..f717c59
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQPersistenceAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ProducerId;
+
+public class AMQPersistenceAdapter
+{
+
+   /**
+    * Returns a set of all the {@link 
org.apache.activemq.command.ActiveMQDestination}
+    * objects that the persistence store is aware exist.
+    *
+    * @return active destinations
+    */
+   Set<ActiveMQDestination> getDestinations()
+   {
+      return null;
+   }
+
+   /**
+    * Factory method to create a new queue message store with the given 
destination name
+    * @param destination
+    * @return the message store
+    * @throws IOException
+    */
+   AMQMessageStore createQueueMessageStore(ActiveMQQueue destination) throws 
IOException
+   {
+      return null;
+   }
+
+   /**
+    * return the last stored producer sequenceId for this producer Id
+    * used to suppress duplicate sends on failover reconnect at the transport
+    * when a reconnect occurs
+    * @param id the producerId to find a sequenceId for
+    * @return the last stored sequence id or -1 if no suppression needed
+    */
+   public long getLastProducerSequenceId(ProducerId id)
+   {
+      return 0;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java
new file mode 100644
index 0000000..16fcf74
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducer.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.ProducerInfo;
+
+public class AMQProducer
+{
+   private AMQSession amqSession;
+   private ProducerInfo info;
+
+   public AMQProducer(AMQSession amqSession, ProducerInfo info)
+   {
+      this.amqSession = amqSession;
+      this.info = info;
+   }
+
+   public void init()
+   {
+      //hornetq doesn't have producer at server.
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
new file mode 100644
index 0000000..9191ee1
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.state.ProducerState;
+
+public class AMQProducerBrokerExchange
+{
+   private AMQConnectionContext connectionContext;
+   private AMQDestination regionDestination;
+   private ProducerState producerState;
+   private boolean mutable = true;
+   private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
+   private boolean auditProducerSequenceIds;
+   private boolean isNetworkProducer;
+   private final FlowControlInfo flowControlInfo = new FlowControlInfo();
+
+   public AMQProducerBrokerExchange()
+   {
+   }
+
+   public AMQProducerBrokerExchange copy()
+   {
+      AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange();
+      rc.connectionContext = connectionContext.copy();
+      rc.regionDestination = regionDestination;
+      rc.producerState = producerState;
+      rc.mutable = mutable;
+      return rc;
+   }
+
+   /**
+    * @return the connectionContext
+    */
+   public AMQConnectionContext getConnectionContext()
+   {
+      return this.connectionContext;
+   }
+
+   /**
+    * @param connectionContext
+    *           the connectionContext to set
+    */
+   public void setConnectionContext(AMQConnectionContext connectionContext)
+   {
+      this.connectionContext = connectionContext;
+   }
+
+   /**
+    * @return the mutable
+    */
+   public boolean isMutable()
+   {
+      return this.mutable;
+   }
+
+   /**
+    * @param mutable
+    *           the mutable to set
+    */
+   public void setMutable(boolean mutable)
+   {
+      this.mutable = mutable;
+   }
+
+   /**
+    * @return the regionDestination
+    */
+   public AMQDestination getRegionDestination()
+   {
+      return this.regionDestination;
+   }
+
+   /**
+    * @param regionDestination
+    *           the regionDestination to set
+    */
+   public void setRegionDestination(AMQDestination regionDestination)
+   {
+      this.regionDestination = regionDestination;
+   }
+
+   /**
+    * @return the producerState
+    */
+   public ProducerState getProducerState()
+   {
+      return this.producerState;
+   }
+
+   /**
+    * @param producerState
+    *           the producerState to set
+    */
+   public void setProducerState(ProducerState producerState)
+   {
+      this.producerState = producerState;
+   }
+
+   /**
+    * Enforce duplicate suppression using info from persistence adapter
+    *
+    * @return false if message should be ignored as a duplicate
+    */
+   public boolean canDispatch(Message messageSend)
+   {
+      boolean canDispatch = true;
+      if (auditProducerSequenceIds && messageSend.isPersistent())
+      {
+         final long producerSequenceId = messageSend.getMessageId()
+               .getProducerSequenceId();
+         if (isNetworkProducer)
+         {
+            // messages are multiplexed on this producer so we need to query 
the
+            // persistenceAdapter
+            long lastStoredForMessageProducer = 
getStoredSequenceIdForMessage(messageSend
+                  .getMessageId());
+            if (producerSequenceId <= lastStoredForMessageProducer)
+            {
+               canDispatch = false;
+            }
+         }
+         else if (producerSequenceId <= lastSendSequenceNumber.get())
+         {
+            canDispatch = false;
+            if (messageSend.isInTransaction())
+            {
+            }
+            else
+            {
+            }
+         }
+         else
+         {
+            // track current so we can suppress duplicates later in the stream
+            lastSendSequenceNumber.set(producerSequenceId);
+         }
+      }
+      return canDispatch;
+   }
+
+   private long getStoredSequenceIdForMessage(MessageId messageId)
+   {
+      return -1;
+   }
+
+   public void setLastStoredSequenceId(long l)
+   {
+   }
+
+   public void incrementSend()
+   {
+      flowControlInfo.incrementSend();
+   }
+
+   public void blockingOnFlowControl(boolean blockingOnFlowControl)
+   {
+      flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
+   }
+
+   public void incrementTimeBlocked(AMQDestination destination, long 
timeBlocked)
+   {
+      flowControlInfo.incrementTimeBlocked(timeBlocked);
+   }
+
+   public boolean isBlockedForFlowControl()
+   {
+      return flowControlInfo.isBlockingOnFlowControl();
+   }
+
+   public void resetFlowControl()
+   {
+      flowControlInfo.reset();
+   }
+
+   public long getTotalTimeBlocked()
+   {
+      return flowControlInfo.getTotalTimeBlocked();
+   }
+
+   public int getPercentageBlocked()
+   {
+      double value = flowControlInfo.getSendsBlocked()
+            / flowControlInfo.getTotalSends();
+      return (int) value * 100;
+   }
+
+   public static class FlowControlInfo
+   {
+      private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
+      private AtomicLong totalSends = new AtomicLong();
+      private AtomicLong sendsBlocked = new AtomicLong();
+      private AtomicLong totalTimeBlocked = new AtomicLong();
+
+      public boolean isBlockingOnFlowControl()
+      {
+         return blockingOnFlowControl.get();
+      }
+
+      public void setBlockingOnFlowControl(boolean blockingOnFlowControl)
+      {
+         this.blockingOnFlowControl.set(blockingOnFlowControl);
+         if (blockingOnFlowControl)
+         {
+            incrementSendBlocked();
+         }
+      }
+
+      public long getTotalSends()
+      {
+         return totalSends.get();
+      }
+
+      public void incrementSend()
+      {
+         this.totalSends.incrementAndGet();
+      }
+
+      public long getSendsBlocked()
+      {
+         return sendsBlocked.get();
+      }
+
+      public void incrementSendBlocked()
+      {
+         this.sendsBlocked.incrementAndGet();
+      }
+
+      public long getTotalTimeBlocked()
+      {
+         return totalTimeBlocked.get();
+      }
+
+      public void incrementTimeBlocked(long time)
+      {
+         this.totalTimeBlocked.addAndGet(time);
+      }
+
+      public void reset()
+      {
+         blockingOnFlowControl.set(false);
+         totalSends.set(0);
+         sendsBlocked.set(0);
+         totalTimeBlocked.set(0);
+
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java
new file mode 100644
index 0000000..7efd30b
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSecurityContext.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.security.Principal;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.command.ActiveMQDestination;
+
+public abstract class AMQSecurityContext
+{
+
+   public static final AMQSecurityContext BROKER_SECURITY_CONTEXT = new 
AMQSecurityContext(
+         "ActiveMQBroker")
+   {
+      @Override
+      public boolean isBrokerContext()
+      {
+         return true;
+      }
+
+      public Set<Principal> getPrincipals()
+      {
+         return Collections.emptySet();
+      }
+   };
+
+   final String userName;
+
+   final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> 
authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, 
ActiveMQDestination>();
+   final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> 
authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, 
ActiveMQDestination>();
+
+   public AMQSecurityContext(String userName)
+   {
+      this.userName = userName;
+   }
+
+   public boolean isInOneOf(Set<?> allowedPrincipals)
+   {
+      Iterator<?> allowedIter = allowedPrincipals.iterator();
+      HashSet<?> userPrincipals = new HashSet<Object>(getPrincipals());
+      while (allowedIter.hasNext())
+      {
+         Iterator<?> userIter = userPrincipals.iterator();
+         Object allowedPrincipal = allowedIter.next();
+         while (userIter.hasNext())
+         {
+            if (allowedPrincipal.equals(userIter.next()))
+               return true;
+         }
+      }
+      return false;
+   }
+
+   public abstract Set<Principal> getPrincipals();
+
+   public String getUserName()
+   {
+      return userName;
+   }
+
+   public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> 
getAuthorizedReadDests()
+   {
+      return authorizedReadDests;
+   }
+
+   public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> 
getAuthorizedWriteDests()
+   {
+      return authorizedWriteDests;
+   }
+
+   public boolean isBrokerContext()
+   {
+      return false;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java
new file mode 100644
index 0000000..2954142
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.util.List;
+
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.QueueBinding;
+import org.hornetq.core.protocol.openwire.OpenWireMessageConverter;
+import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+public class AMQServerConsumer extends ServerConsumerImpl
+{
+
+   public AMQServerConsumer(long consumerID, AMQServerSession serverSession,
+         QueueBinding binding, Filter filter, boolean started,
+         boolean browseOnly, StorageManager storageManager,
+         SessionCallback callback, boolean preAcknowledge,
+         boolean strictUpdateDeliveryCount,
+         ManagementService managementService, boolean supportLargeMessage,
+         Integer credits) throws Exception
+   {
+      super(consumerID, serverSession, binding, filter, started, browseOnly, 
storageManager,
+            callback, preAcknowledge, strictUpdateDeliveryCount, 
managementService,
+            supportLargeMessage, credits);
+   }
+
+   public void setBrowserListener(BrowserListener listener)
+   {
+      AMQBrowserDeliverer newBrowserDeliverer = new 
AMQBrowserDeliverer(this.browserDeliverer);
+      newBrowserDeliverer.listener = listener;
+      this.browserDeliverer = newBrowserDeliverer;
+   }
+
+   private class AMQBrowserDeliverer extends BrowserDeliverer
+   {
+      private BrowserListener listener = null;
+
+      public AMQBrowserDeliverer(final BrowserDeliverer other)
+      {
+         super(other.iterator);
+      }
+
+      @Override
+      public synchronized void run()
+      {
+         // if the reference was busy during the previous iteration, handle it 
now
+         if (current != null)
+         {
+            try
+            {
+               HandleStatus status = handle(current);
+
+               if (status == HandleStatus.BUSY)
+               {
+                  return;
+               }
+
+               if (status == HandleStatus.HANDLED)
+               {
+                  proceedDeliver(current);
+               }
+
+               current = null;
+            }
+            catch (Exception e)
+            {
+               HornetQServerLogger.LOGGER.errorBrowserHandlingMessage(e, 
current);
+               return;
+            }
+         }
+
+         MessageReference ref = null;
+         HandleStatus status;
+
+         while (true)
+         {
+            try
+            {
+               ref = null;
+               synchronized (messageQueue)
+               {
+                  if (!iterator.hasNext())
+                  {
+                     //here we need to send a null for amq browsers
+                     if (listener != null)
+                     {
+                        listener.browseFinished();
+                     }
+                     break;
+                  }
+
+                  ref = iterator.next();
+
+                  status = handle(ref);
+               }
+
+               if (status == HandleStatus.HANDLED)
+               {
+                  proceedDeliver(ref);
+               }
+               else if (status == HandleStatus.BUSY)
+               {
+                  // keep a reference on the current message reference
+                  // to handle it next time the browser deliverer is executed
+                  current = ref;
+                  break;
+               }
+            }
+            catch (Exception e)
+            {
+               HornetQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref);
+               break;
+            }
+         }
+      }
+   }
+
+   public void amqPutBackToDeliveringList(final List<MessageReference> refs)
+   {
+      synchronized (this.deliveringRefs)
+      {
+         for (MessageReference ref : refs)
+         {
+            ref.incrementDeliveryCount();
+            deliveringRefs.add(ref);
+         }
+         //adjust the order. Suppose deliveringRefs has 2 existing
+         //refs m1, m2, and refs has 3 m3, m4, m5
+         //new order must be m3, m4, m5, m1, m2
+         if (refs.size() > 0)
+         {
+            long first = refs.get(0).getMessage().getMessageID();
+            MessageReference m = deliveringRefs.peek();
+            while (m.getMessage().getMessageID() != first)
+            {
+               deliveringRefs.poll();
+               deliveringRefs.add(m);
+               m = deliveringRefs.peek();
+            }
+         }
+      }
+   }
+
+   public void moveToDeadLetterAddress(long mid, Throwable cause) throws 
Exception
+   {
+      MessageReference ref = removeReferenceByID(mid);
+
+      if (ref == null)
+      {
+         throw new IllegalStateException("Cannot find ref to ack " + mid);
+      }
+
+      ServerMessage coreMsg = ref.getMessage();
+      
coreMsg.putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
 cause.toString());
+
+      QueueImpl queue = (QueueImpl)ref.getQueue();
+      synchronized (queue)
+      {
+         queue.sendToDeadLetterAddress(ref);
+         queue.decDelivering();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java
new file mode 100644
index 0000000..0711f01
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSession.java
@@ -0,0 +1,451 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
+import org.hornetq.api.core.management.ManagementHelper;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.QueueBinding;
+import org.hornetq.core.protocol.openwire.AMQTransactionImpl;
+import org.hornetq.core.security.SecurityStore;
+import org.hornetq.core.server.HornetQMessageBundle;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerConsumer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.RefsOperation;
+import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUID;
+
+public class AMQServerSession extends ServerSessionImpl
+{
+   private boolean internal;
+
+   public AMQServerSession(String name, String username, String password,
+         int minLargeMessageSize, boolean autoCommitSends,
+         boolean autoCommitAcks, boolean preAcknowledge,
+         boolean persistDeliveryCountBeforeDelivery, boolean xa,
+         RemotingConnection connection, StorageManager storageManager,
+         PostOffice postOffice, ResourceManager resourceManager,
+         SecurityStore securityStore, ManagementService managementService,
+         HornetQServerImpl hornetQServerImpl, SimpleString managementAddress,
+         SimpleString simpleString, SessionCallback callback,
+         OperationContext context) throws Exception
+   {
+      super(name, username, password,
+         minLargeMessageSize, autoCommitSends,
+         autoCommitAcks, preAcknowledge,
+         persistDeliveryCountBeforeDelivery, xa,
+         connection, storageManager,
+         postOffice, resourceManager,
+         securityStore, managementService,
+         hornetQServerImpl, managementAddress,
+         simpleString, callback,
+         context, new AMQTransactionFactory());
+   }
+
+   //create a fake session just for security check
+   public AMQServerSession(String user, String pass)
+   {
+      super(user, pass);
+   }
+
+   protected void doClose(final boolean failed) throws Exception
+   {
+      synchronized (this)
+      {
+         if (tx != null && tx.getXid() == null)
+         {
+            ((AMQTransactionImpl)tx).setRollbackForClose();
+         }
+      }
+      super.doClose(failed);
+   }
+
+   public AtomicInteger getConsumerCredits(final long consumerID)
+   {
+      ServerConsumer consumer = consumers.get(consumerID);
+
+      if (consumer == null)
+      {
+         HornetQServerLogger.LOGGER.debug("There is no consumer with id " + 
consumerID);
+
+         return null;
+      }
+
+      return ((ServerConsumerImpl)consumer).getAvailableCredits();
+   }
+
+   public void enableXA() throws Exception
+   {
+      if (!this.xa)
+      {
+         if (this.tx != null)
+         {
+            //that's not expected, maybe a warning.
+            this.tx.rollback();
+            this.tx = null;
+         }
+
+         this.autoCommitAcks = false;
+         this.autoCommitSends = false;
+
+         this.xa = true;
+      }
+   }
+
+   public void enableTx() throws Exception
+   {
+      if (this.xa)
+      {
+         throw new IllegalStateException("Session is XA");
+      }
+
+      this.autoCommitAcks = false;
+      this.autoCommitSends = false;
+
+      if (this.tx != null)
+      {
+         //that's not expected, maybe a warning.
+         this.tx.rollback();
+         this.tx = null;
+      }
+
+      this.tx = newTransaction();
+   }
+
+   //amq specific behavior
+   public void amqRollback(Set<Long> acked) throws Exception
+   {
+      if (tx == null)
+      {
+         // Might be null if XA
+
+         tx = newTransaction();
+      }
+
+      RefsOperation oper = (RefsOperation) 
tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+
+      if (oper != null)
+      {
+         List<MessageReference> ackRefs = oper.getReferencesToAcknowledge();
+         Map<Long, List<MessageReference>> toAcks = new HashMap<Long, 
List<MessageReference>>();
+         for (MessageReference ref : ackRefs)
+         {
+            Long consumerId = ref.getConsumerId();
+
+            if (this.consumers.containsKey(consumerId))
+            {
+               if (acked.contains(ref.getMessage().getMessageID()))
+               {
+                  List<MessageReference> ackList = toAcks.get(consumerId);
+                  if (ackList == null)
+                  {
+                     ackList = new ArrayList<MessageReference>();
+                     toAcks.put(consumerId, ackList);
+                  }
+                  ackList.add(ref);
+               }
+            }
+            else
+            {
+               //consumer must have been closed, cancel to queue
+               ref.getQueue().cancel(tx, ref);
+            }
+         }
+         //iterate consumers
+         if (toAcks.size() > 0)
+         {
+            Iterator<Entry<Long, List<MessageReference>>> iter = 
toAcks.entrySet().iterator();
+            while (iter.hasNext())
+            {
+               Entry<Long, List<MessageReference>> entry = iter.next();
+               ServerConsumer consumer = consumers.get(entry.getKey());
+               
((AMQServerConsumer)consumer).amqPutBackToDeliveringList(entry.getValue());
+            }
+         }
+      }
+
+      tx.rollback();
+
+      if (xa)
+      {
+         tx = null;
+      }
+      else
+      {
+         tx = newTransaction();
+      }
+
+   }
+
+   /**
+    * The failed flag is used here to control delivery count.
+    * If set to true the delivery count won't decrement.
+    */
+   public void amqCloseConsumer(long consumerID, boolean failed) throws 
Exception
+   {
+      final ServerConsumer consumer = consumers.get(consumerID);
+
+      if (consumer != null)
+      {
+         consumer.close(failed);
+      }
+      else
+      {
+         HornetQServerLogger.LOGGER.cannotFindConsumer(consumerID);
+      }
+   }
+
+   @Override
+   public ServerConsumer createConsumer(final long consumerID,
+                              final SimpleString queueName,
+                              final SimpleString filterString,
+                              final boolean browseOnly,
+                              final boolean supportLargeMessage,
+                              final Integer credits) throws Exception
+   {
+      if (this.internal)
+      {
+         //internal sessions doesn't check security
+
+         Binding binding = postOffice.getBinding(queueName);
+
+         if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
+         {
+            throw HornetQMessageBundle.BUNDLE.noSuchQueue(queueName);
+         }
+
+         Filter filter = FilterImpl.createFilter(filterString);
+
+         ServerConsumer consumer = newConsumer(consumerID, this,
+               (QueueBinding) binding, filter, started, browseOnly,
+               storageManager, callback, preAcknowledge,
+               strictUpdateDeliveryCount, managementService,
+               supportLargeMessage, credits);
+         consumers.put(consumer.getID(), consumer);
+
+         if (!browseOnly)
+         {
+            TypedProperties props = new TypedProperties();
+
+            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS,
+                  binding.getAddress());
+
+            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME,
+                  binding.getClusterName());
+
+            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME,
+                  binding.getRoutingName());
+
+            props.putIntProperty(ManagementHelper.HDR_DISTANCE,
+                  binding.getDistance());
+
+            Queue theQueue = (Queue) binding.getBindable();
+
+            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT,
+                  theQueue.getConsumerCount());
+
+            // HORNETQ-946
+            props.putSimpleStringProperty(ManagementHelper.HDR_USER,
+                  SimpleString.toSimpleString(username));
+
+            props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS,
+                  SimpleString.toSimpleString(this.remotingConnection
+                        .getRemoteAddress()));
+
+            props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME,
+                  SimpleString.toSimpleString(name));
+
+            if (filterString != null)
+            {
+               props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+                     filterString);
+            }
+
+            Notification notification = new Notification(null,
+                  CoreNotificationType.CONSUMER_CREATED, props);
+
+            if (HornetQServerLogger.LOGGER.isDebugEnabled())
+            {
+               HornetQServerLogger.LOGGER.debug("Session with user=" + username
+                     + ", connection=" + this.remotingConnection
+                     + " created a consumer on queue " + queueName
+                     + ", filter = " + filterString);
+            }
+
+            managementService.sendNotification(notification);
+         }
+
+         return consumer;
+      }
+      else
+      {
+         return super.createConsumer(consumerID, queueName, filterString, 
browseOnly, supportLargeMessage, credits);
+      }
+   }
+
+   @Override
+   public void createQueue(final SimpleString address,
+                           final SimpleString name,
+                           final SimpleString filterString,
+                           final boolean temporary,
+                           final boolean durable) throws Exception
+   {
+      if (!this.internal)
+      {
+         super.createQueue(address, name, filterString, temporary, durable);
+         return;
+      }
+
+      server.createQueue(address, name, filterString, durable, temporary);
+
+      if (temporary)
+      {
+         // Temporary queue in core simply means the queue will be deleted if
+         // the remoting connection
+         // dies. It does not mean it will get deleted automatically when the
+         // session is closed.
+         // It is up to the user to delete the queue when finished with it
+
+         TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, 
name);
+
+         remotingConnection.addCloseListener(cleaner);
+         remotingConnection.addFailureListener(cleaner);
+
+         tempQueueCleannerUppers.put(name, cleaner);
+      }
+
+      if (HornetQServerLogger.LOGGER.isDebugEnabled())
+      {
+         HornetQServerLogger.LOGGER.debug("Queue " + name + " created on 
address " + name +
+                                             " with filter=" + filterString + 
" temporary = " +
+                                             temporary + " durable=" + durable 
+ " on session user=" + this.username + ", connection=" + 
this.remotingConnection);
+      }
+
+   }
+
+   @Override
+   protected void doSend(final ServerMessage msg, final boolean direct) throws 
Exception
+   {
+      if (!this.internal)
+      {
+         super.doSend(msg, direct);
+         return;
+      }
+
+      //bypass security check for internal sessions
+      if (tx == null || autoCommitSends)
+      {
+      }
+      else
+      {
+         routingContext.setTransaction(tx);
+      }
+
+      try
+      {
+         postOffice.route(msg, routingContext, direct);
+
+         Pair<UUID, AtomicLong> value = 
targetAddressInfos.get(msg.getAddress());
+
+         if (value == null)
+         {
+            targetAddressInfos.put(msg.getAddress(), new Pair<UUID, 
AtomicLong>(msg.getUserID(), new AtomicLong(1)));
+         }
+         else
+         {
+            value.setA(msg.getUserID());
+            value.getB().incrementAndGet();
+         }
+      }
+      finally
+      {
+         routingContext.clear();
+      }
+   }
+
+   @Override
+   protected ServerConsumer newConsumer(long consumerID,
+         ServerSessionImpl serverSessionImpl, QueueBinding binding,
+         Filter filter, boolean started2, boolean browseOnly,
+         StorageManager storageManager2, SessionCallback callback2,
+         boolean preAcknowledge2, boolean strictUpdateDeliveryCount2,
+         ManagementService managementService2, boolean supportLargeMessage,
+         Integer credits) throws Exception
+   {
+      return new AMQServerConsumer(consumerID,
+            this,
+            (QueueBinding) binding,
+            filter,
+            started,
+            browseOnly,
+            storageManager,
+            callback,
+            preAcknowledge,
+            strictUpdateDeliveryCount,
+            managementService,
+            supportLargeMessage,
+            credits);
+   }
+
+   public AMQServerConsumer getConsumer(long nativeId)
+   {
+      return (AMQServerConsumer) this.consumers.get(nativeId);
+   }
+
+   public void setInternal(boolean internal)
+   {
+      this.internal = internal;
+   }
+
+   public boolean isInternal()
+   {
+      return this.internal;
+   }
+
+   public void moveToDeadLetterAddress(long consumerId, long mid, Throwable 
cause) throws Exception
+   {
+      AMQServerConsumer consumer = getConsumer(consumerId);
+      consumer.moveToDeadLetterAddress(mid, cause);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java
new file mode 100644
index 0000000..db044a0
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQServerSessionFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.security.SecurityStore;
+import org.hornetq.core.server.ServerSessionFactory;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+public class AMQServerSessionFactory implements ServerSessionFactory
+{
+
+   @Override
+   public ServerSessionImpl createCoreSession(String name, String username,
+         String password, int minLargeMessageSize, boolean autoCommitSends,
+         boolean autoCommitAcks, boolean preAcknowledge,
+         boolean persistDeliveryCountBeforeDelivery, boolean xa,
+         RemotingConnection connection, StorageManager storageManager,
+         PostOffice postOffice, ResourceManager resourceManager,
+         SecurityStore securityStore, ManagementService managementService,
+         HornetQServerImpl hornetQServerImpl, SimpleString managementAddress,
+         SimpleString simpleString, SessionCallback callback,
+         OperationContext context) throws Exception
+   {
+      return new AMQServerSession(name, username, password, 
minLargeMessageSize, autoCommitSends,
+            autoCommitAcks, preAcknowledge, 
persistDeliveryCountBeforeDelivery, xa,
+            connection, storageManager, postOffice, resourceManager, 
securityStore,
+            managementService, hornetQServerImpl, managementAddress, 
simpleString, callback,
+            context);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java
new file mode 100644
index 0000000..34f0ff4
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSession.java
@@ -0,0 +1,590 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.wireformat.WireFormat;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.protocol.openwire.OpenWireConnection;
+import org.hornetq.core.protocol.openwire.OpenWireMessageConverter;
+import org.hornetq.core.protocol.openwire.OpenWireProtocolManager;
+import org.hornetq.core.protocol.openwire.OpenWireUtil;
+import org.hornetq.core.protocol.openwire.SendingResult;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.ServerConsumer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.ReadyListener;
+
+public class AMQSession implements SessionCallback
+{
+   private AMQServerSession coreSession;
+   private ConnectionInfo connInfo;
+   private SessionInfo sessInfo;
+   private HornetQServer server;
+   private OpenWireConnection connection;
+   //native id -> consumer
+   private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<Long, 
AMQConsumer>();
+   //amq id -> native id
+   private Map<Long, Long> consumerIdMap = new HashMap<Long, Long>();
+
+   private Map<Long, AMQProducer> producers = new HashMap<Long, AMQProducer>();
+
+   private AtomicBoolean started = new AtomicBoolean(false);
+
+   private TransactionId txId = null;
+
+   private boolean isTx;
+
+   private OpenWireProtocolManager manager;
+
+   public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo,
+         HornetQServer server, OpenWireConnection connection, 
OpenWireProtocolManager manager)
+   {
+      this.connInfo = connInfo;
+      this.sessInfo = sessInfo;
+      this.server = server;
+      this.connection = connection;
+      this.manager = manager;
+   }
+
+   public void initialize()
+   {
+      String name = sessInfo.getSessionId().toString();
+      String username = connInfo.getUserName();
+      String password = connInfo.getPassword();
+
+      int minLargeMessageSize = Integer.MAX_VALUE; // disable
+                                                   // minLargeMessageSize for
+                                                   // now
+
+      try
+      {
+         coreSession = (AMQServerSession) server.createSession(name, username, 
password,
+               minLargeMessageSize, connection, true, false, false, false,
+               null, this, new AMQServerSessionFactory());
+
+         long sessionId = sessInfo.getSessionId().getValue();
+         if (sessionId == -1)
+         {
+            this.connection.setAdvisorySession(this);
+         }
+      }
+      catch (Exception e)
+      {
+         HornetQServerLogger.LOGGER.error("error init session", e);
+      }
+
+   }
+
+   public void createConsumer(ConsumerInfo info) throws Exception
+   {
+      //check destination
+      ActiveMQDestination dest = info.getDestination();
+      ActiveMQDestination[] dests = null;
+      if (dest.isComposite())
+      {
+         dests = dest.getCompositeDestinations();
+      }
+      else
+      {
+         dests = new ActiveMQDestination[] {dest};
+      }
+
+      for (ActiveMQDestination d : dests)
+      {
+         AMQConsumer consumer = new AMQConsumer(this, d, info);
+         consumer.init();
+         consumers.put(consumer.getNativeId(), consumer);
+         this.consumerIdMap.put(info.getConsumerId().getValue(), 
consumer.getNativeId());
+      }
+      coreSession.start();
+      started.set(true);
+   }
+
+   @Override
+   public void sendProducerCreditsMessage(int credits, SimpleString address)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public void sendProducerCreditsFailMessage(int credits, SimpleString 
address)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public int sendMessage(ServerMessage message, ServerConsumer consumerID, 
int deliveryCount)
+   {
+      AMQConsumer consumer = consumers.get(consumerID.getID());
+      return consumer.handleDeliver(message, deliveryCount);
+   }
+
+   @Override
+   public int sendLargeMessage(ServerMessage message, ServerConsumer 
consumerID,
+         long bodySize, int deliveryCount)
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   @Override
+   public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] 
body,
+         boolean continues, boolean requiresResponse)
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   @Override
+   public void closed()
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public void addReadyListener(ReadyListener listener)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public void removeReadyListener(ReadyListener listener)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public boolean hasCredits(ServerConsumer consumerID)
+   {
+      return true;
+   }
+
+   @Override
+   public void disconnect(ServerConsumer consumerId, String queueName)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   public AMQServerSession getCoreSession()
+   {
+      return this.coreSession;
+   }
+
+   public HornetQServer getCoreServer()
+   {
+      return this.server;
+   }
+
+   public void removeConsumer(ConsumerInfo info) throws Exception
+   {
+      long consumerId = info.getConsumerId().getValue();
+      long nativeId = this.consumerIdMap.remove(consumerId);
+      if (this.txId != null || this.isTx)
+      {
+         ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, false);
+      }
+      else
+      {
+         ((AMQServerSession)coreSession).amqCloseConsumer(nativeId, true);
+      }
+      AMQConsumer consumer = consumers.remove(nativeId);
+   }
+
+   public void createProducer(ProducerInfo info)
+   {
+      AMQProducer producer = new AMQProducer(this, info);
+      producer.init();
+      producers.put(info.getProducerId().getValue(), producer);
+   }
+
+   public void removeProducer(ProducerInfo info)
+   {
+      removeProducer(info.getProducerId());
+   }
+
+   public void removeProducer(ProducerId id)
+   {
+      producers.remove(id.getValue());
+   }
+
+   public SendingResult send(AMQProducerBrokerExchange producerExchange,
+         Message messageSend, boolean sendProducerAck) throws Exception
+   {
+      SendingResult result = new SendingResult();
+      TransactionId tid = messageSend.getTransactionId();
+      if (tid != null)
+      {
+         resetSessionTx(tid);
+      }
+
+      messageSend.setBrokerInTime(System.currentTimeMillis());
+
+      ActiveMQDestination destination = messageSend.getDestination();
+      ActiveMQDestination[] actualDestinations = null;
+      if (destination.isComposite())
+      {
+         actualDestinations = destination.getCompositeDestinations();
+      }
+      else
+      {
+         actualDestinations = new ActiveMQDestination[] {destination};
+      }
+
+      for (ActiveMQDestination dest : actualDestinations)
+      {
+         ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
+         OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, 
connection.getMarshaller());
+         SimpleString address = OpenWireUtil.toCoreAddress(dest);
+         coreMsg.setAddress(address);
+
+         PagingStoreImpl store = 
(PagingStoreImpl)server.getPagingManager().getPageStore(address);
+         if (store.isFull())
+         {
+            result.setBlockNextSend(true);
+            result.setBlockPagingStore(store);
+            result.setBlockingAddress(address);
+            //now we hold this message send until the store has space.
+            //we do this by put it in a scheduled task
+            ScheduledExecutorService scheduler = server.getScheduledPool();
+            Runnable sendRetryTask = new SendRetryTask(coreMsg, 
producerExchange, sendProducerAck,
+                                                       messageSend.getSize(), 
messageSend.getCommandId());
+            scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS);
+         }
+         else
+         {
+            coreSession.send(coreMsg, false);
+         }
+      }
+      return result;
+   }
+
+   public WireFormat getMarshaller()
+   {
+      return this.connection.getMarshaller();
+   }
+
+   public void acknowledge(MessageAck ack) throws Exception
+   {
+      TransactionId tid = ack.getTransactionId();
+      if (tid != null)
+      {
+         this.resetSessionTx(ack.getTransactionId());
+      }
+      ConsumerId consumerId = ack.getConsumerId();
+      long nativeConsumerId = consumerIdMap.get(consumerId.getValue());
+      AMQConsumer consumer = consumers.get(nativeConsumerId);
+      consumer.acknowledge(ack);
+
+      if (tid == null && ack.getAckType() == MessageAck.STANDARD_ACK_TYPE)
+      {
+         this.coreSession.commit();
+      }
+   }
+
+   //AMQ session and transactions are create separately. Whether a session
+   //is transactional or not is known only when a TransactionInfo command
+   //comes in.
+   public void resetSessionTx(TransactionId xid) throws Exception
+   {
+      if ((this.txId != null) && (!this.txId.equals(xid)))
+      {
+         throw new IllegalStateException("Session already associated with a 
tx");
+      }
+
+      this.isTx = true;
+      if (this.txId == null)
+      {
+         //now reset session
+         this.txId = xid;
+
+         if (xid.isXATransaction())
+         {
+            XATransactionId xaXid = (XATransactionId)xid;
+            coreSession.enableXA();
+            XidImpl coreXid = new XidImpl(xaXid.getBranchQualifier(), 
xaXid.getFormatId(), xaXid.getGlobalTransactionId());
+            coreSession.xaStart(coreXid);
+         }
+         else
+         {
+            coreSession.enableTx();
+         }
+
+         this.manager.registerTx(this.txId, this);
+      }
+   }
+
+   private void checkTx(TransactionId inId)
+   {
+      if (this.txId == null)
+      {
+         throw new IllegalStateException("Session has no transaction 
associated with it");
+      }
+
+      if (!this.txId.equals(inId))
+      {
+         throw new IllegalStateException("Session already associated with 
another tx");
+      }
+
+      this.isTx = true;
+   }
+
+   public void commitOnePhase(TransactionInfo info) throws Exception
+   {
+      checkTx(info.getTransactionId());
+
+      if (txId.isXATransaction())
+      {
+         XATransactionId xid = (XATransactionId) txId;
+         XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), 
xid.getFormatId(), xid.getGlobalTransactionId());
+         this.coreSession.xaCommit(coreXid, true);
+      }
+      else
+      {
+         Iterator<AMQConsumer> iter = consumers.values().iterator();
+         while (iter.hasNext())
+         {
+            AMQConsumer consumer = iter.next();
+            consumer.finishTx();
+         }
+         this.coreSession.commit();
+      }
+
+      this.txId = null;
+   }
+
+   public void prepareTransaction(XATransactionId xid) throws Exception
+   {
+      checkTx(xid);
+      XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), 
xid.getFormatId(), xid.getGlobalTransactionId());
+      this.coreSession.xaPrepare(coreXid);
+   }
+
+   public void commitTwoPhase(XATransactionId xid) throws Exception
+   {
+      checkTx(xid);
+      XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), 
xid.getFormatId(), xid.getGlobalTransactionId());
+      this.coreSession.xaCommit(coreXid, false);
+
+      this.txId = null;
+   }
+
+   public void rollback(TransactionInfo info) throws Exception
+   {
+      checkTx(info.getTransactionId());
+      if (this.txId.isXATransaction())
+      {
+         XATransactionId xid = (XATransactionId) txId;
+         XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), 
xid.getFormatId(), xid.getGlobalTransactionId());
+         this.coreSession.xaRollback(coreXid);
+      }
+      else
+      {
+         Iterator<AMQConsumer> iter = consumers.values().iterator();
+         Set<Long> acked = new HashSet<Long>();
+         while (iter.hasNext())
+         {
+            AMQConsumer consumer = iter.next();
+            consumer.rollbackTx(acked);
+         }
+         //on local rollback, amq broker doesn't do anything about the 
delivered
+         //messages, which stay at clients until next time
+         this.coreSession.amqRollback(acked);
+      }
+
+      this.txId = null;
+   }
+
+   public void recover(List<TransactionId> recovered)
+   {
+      List<Xid> xids = this.coreSession.xaGetInDoubtXids();
+      for (Xid xid : xids)
+      {
+         XATransactionId amqXid = new XATransactionId(xid);
+         recovered.add(amqXid);
+      }
+   }
+
+   public void forget(final TransactionId tid) throws Exception
+   {
+      checkTx(tid);
+      XATransactionId xid = (XATransactionId) tid;
+      XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), 
xid.getFormatId(), xid.getGlobalTransactionId());
+      this.coreSession.xaForget(coreXid);
+      this.txId = null;
+   }
+
+   public ConnectionInfo getConnectionInfo()
+   {
+      return this.connInfo;
+   }
+
+   public void setInternal(boolean internal)
+   {
+      this.coreSession.setInternal(internal);
+   }
+
+   public boolean isInternal()
+   {
+      return this.coreSession.isInternal();
+   }
+
+   public void deliverMessage(MessageDispatch dispatch)
+   {
+      this.connection.deliverMessage(dispatch);
+   }
+
+   public void close() throws Exception
+   {
+      this.coreSession.close(false);
+   }
+
+   private class SendRetryTask implements Runnable
+   {
+      private ServerMessage coreMsg;
+      private AMQProducerBrokerExchange producerExchange;
+      private boolean sendProducerAck;
+      private int msgSize;
+      private int commandId;
+
+      public SendRetryTask(ServerMessage coreMsg, AMQProducerBrokerExchange 
producerExchange,
+            boolean sendProducerAck, int msgSize, int commandId)
+      {
+         this.coreMsg = coreMsg;
+         this.producerExchange = producerExchange;
+         this.sendProducerAck = sendProducerAck;
+         this.msgSize = msgSize;
+         this.commandId = commandId;
+      }
+
+      @Override
+      public void run()
+      {
+         synchronized (AMQSession.this)
+         {
+            try
+            {
+               // check pageStore
+               SimpleString address = coreMsg.getAddress();
+               PagingStoreImpl store = (PagingStoreImpl) server
+                     .getPagingManager().getPageStore(address);
+               if (store.isFull())
+               {
+                  // if store is still full, schedule another
+                  server.getScheduledPool().schedule(this, 10, 
TimeUnit.MILLISECONDS);
+               }
+               else
+               {
+                  // now send the message again.
+                  coreSession.send(coreMsg, false);
+
+                  if (sendProducerAck)
+                  {
+                     ProducerInfo producerInfo = producerExchange
+                           .getProducerState().getInfo();
+                     ProducerAck ack = new ProducerAck(
+                           producerInfo.getProducerId(), msgSize);
+                     connection.dispatchAsync(ack);
+                  }
+                  else
+                  {
+                     Response response = new Response();
+                     response.setCorrelationId(commandId);
+                     connection.dispatchAsync(response);
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               ExceptionResponse response = new ExceptionResponse(e);
+               response.setCorrelationId(commandId);
+               connection.dispatchAsync(response);
+            }
+         }
+
+      }
+   }
+
+   public void blockingWaitForSpace(AMQProducerBrokerExchange 
producerExchange, SendingResult result) throws IOException
+   {
+      long start = System.currentTimeMillis();
+      long nextWarn = start;
+      producerExchange.blockingOnFlowControl(true);
+
+      AMQConnectionContext context = producerExchange.getConnectionContext();
+      PagingStoreImpl store = result.getBlockPagingStore();
+
+      //Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL
+      long blockedProducerWarningInterval = 30000;
+      ProducerId producerId = 
producerExchange.getProducerState().getInfo().getProducerId();
+
+      while (store.isFull())
+      {
+         if (context.getStopping().get())
+         {
+            throw new IOException("Connection closed, send aborted.");
+         }
+
+         long now = System.currentTimeMillis();
+         if (now >= nextWarn)
+         {
+            HornetQServerLogger.LOGGER.warn("Memory Limit reached. Producer (" 
+ producerId + ") stopped to prevent flooding "
+                               + result.getBlockingAddress()
+                               + " See 
http://activemq.apache.org/producer-flow-control.html for more info"
+                               + " (blocking for " + ((now - start) / 1000) + 
"s");
+            nextWarn = now + blockedProducerWarningInterval;
+         }
+      }
+      producerExchange.blockingOnFlowControl(false);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java
new file mode 100644
index 0000000..d62956b
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSharedDeadLetterStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+
+public class AMQSharedDeadLetterStrategy extends AMQAbstractDeadLetterStrategy
+{
+   public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ";
+
+   private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(
+         DEFAULT_DEAD_LETTER_QUEUE_NAME);
+
+   public ActiveMQDestination getDeadLetterQueueFor(Message message,
+         AMQSubscription subscription)
+   {
+      return deadLetterQueue;
+   }
+
+   public ActiveMQDestination getDeadLetterQueue()
+   {
+      return deadLetterQueue;
+   }
+
+   public void setDeadLetterQueue(ActiveMQDestination deadLetterQueue)
+   {
+      this.deadLetterQueue = deadLetterQueue;
+   }
+
+   @Override
+   public boolean isDLQ(ActiveMQDestination destination)
+   {
+      if (destination.equals(deadLetterQueue))
+      {
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java
new file mode 100644
index 0000000..5ffe1ac
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSingleTransportConnectionStateRegister.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+
+/**
+ * We just copy this structure from amq, but what's the purpose
+ * and can it be removed ?
+ *
+ * @author howard
+ *
+ */
+public class AMQSingleTransportConnectionStateRegister implements
+      AMQTransportConnectionStateRegister
+{
+
+   private AMQTransportConnectionState connectionState;
+   private ConnectionId connectionId;
+
+   public AMQTransportConnectionState registerConnectionState(
+         ConnectionId connectionId, AMQTransportConnectionState state)
+   {
+      AMQTransportConnectionState rc = connectionState;
+      connectionState = state;
+      this.connectionId = connectionId;
+      return rc;
+   }
+
+   public synchronized AMQTransportConnectionState unregisterConnectionState(
+         ConnectionId connectionId)
+   {
+      AMQTransportConnectionState rc = null;
+
+      if (connectionId != null && connectionState != null
+            && this.connectionId != null)
+      {
+         if (this.connectionId.equals(connectionId))
+         {
+            rc = connectionState;
+            connectionState = null;
+            connectionId = null;
+         }
+      }
+      return rc;
+   }
+
+   public synchronized List<AMQTransportConnectionState> listConnectionStates()
+   {
+      List<AMQTransportConnectionState> rc = new 
ArrayList<AMQTransportConnectionState>();
+      if (connectionState != null)
+      {
+         rc.add(connectionState);
+      }
+      return rc;
+   }
+
+   public synchronized AMQTransportConnectionState lookupConnectionState(
+         String connectionId)
+   {
+      AMQTransportConnectionState cs = connectionState;
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a connectionId for a connection that had not 
been registered: "
+                     + connectionId);
+      }
+      return cs;
+   }
+
+   public synchronized AMQTransportConnectionState lookupConnectionState(
+         ConsumerId id)
+   {
+      AMQTransportConnectionState cs = connectionState;
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a consumer from a connection that had not been 
registered: "
+                     + id.getParentId().getParentId());
+      }
+      return cs;
+   }
+
+   public synchronized AMQTransportConnectionState lookupConnectionState(
+         ProducerId id)
+   {
+      AMQTransportConnectionState cs = connectionState;
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a producer from a connection that had not been 
registered: "
+                     + id.getParentId().getParentId());
+      }
+      return cs;
+   }
+
+   public synchronized AMQTransportConnectionState lookupConnectionState(
+         SessionId id)
+   {
+      AMQTransportConnectionState cs = connectionState;
+      if (cs == null)
+      {
+         throw new IllegalStateException(
+               "Cannot lookup a session from a connection that had not been 
registered: "
+                     + id.getParentId());
+      }
+      return cs;
+   }
+
+   public synchronized AMQTransportConnectionState lookupConnectionState(
+         ConnectionId connectionId)
+   {
+      AMQTransportConnectionState cs = connectionState;
+      return cs;
+   }
+
+   public synchronized boolean doesHandleMultipleConnectionStates()
+   {
+      return false;
+   }
+
+   public synchronized boolean isEmpty()
+   {
+      return connectionState == null;
+   }
+
+   public void intialize(AMQTransportConnectionStateRegister other)
+   {
+
+      if (other.isEmpty())
+      {
+         clear();
+      }
+      else
+      {
+         Map map = other.mapStates();
+         Iterator i = map.entrySet().iterator();
+         Map.Entry<ConnectionId, AMQTransportConnectionState> entry = 
(Entry<ConnectionId, AMQTransportConnectionState>) i
+               .next();
+         connectionId = entry.getKey();
+         connectionState = entry.getValue();
+      }
+
+   }
+
+   public Map<ConnectionId, AMQTransportConnectionState> mapStates()
+   {
+      Map<ConnectionId, AMQTransportConnectionState> map = new 
HashMap<ConnectionId, AMQTransportConnectionState>();
+      if (!isEmpty())
+      {
+         map.put(connectionId, connectionState);
+      }
+      return map;
+   }
+
+   public void clear()
+   {
+      connectionState = null;
+      connectionId = null;
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
new file mode 100644
index 0000000..71c0952
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSlowConsumerStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+public interface AMQSlowConsumerStrategy
+{
+
+   /**
+    * Slow consumer event.
+    *
+    * @param context
+    *      Connection context of the subscription.
+    * @param subs
+    *      The subscription object for the slow consumer.
+    */
+   void slowConsumer(AMQConnectionContext context, AMQSubscription subs);
+
+   /**
+    * For Strategies that need to examine assigned destination for slow 
consumers
+    * periodically the destination is assigned here.
+    *
+    * If the strategy doesn't is event driven it can just ignore assigned 
destination.
+    *
+    * @param destination
+    *      A destination to add to a watch list.
+    */
+   void addDestination(AMQDestination destination);
+
+}

Reply via email to