http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
new file mode 100644
index 0000000..0c20c0f
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.proton.plug.test.minimalserver;
+
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import org.proton.plug.AMQPServerConnectionContext;
+import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
+import org.proton.plug.test.Constants;
+
+/**
+ * A Netty TCP Acceptor that supports SSL
+ *
+ * @author <a href="[email protected]">Andy Taylor</a>
+ * @author <a href="[email protected]">Tim Fox</a>
+ * @author <a href="[email protected]">Trustin Lee</a>
+ * @author <a href="[email protected]">Jeff Mesnil</a>
+ * @author <a href="[email protected]">Norman Maurer</a>
+ * @version $Rev$, $Date$
+ */
+public class MinimalServer
+{
+
+   static
+   {
+      // Disable resource leak detection for performance reasons by default
+      ResourceLeakDetector.setEnabled(false);
+   }
+
+   private Class<? extends ServerChannel> channelClazz;
+
+   private EventLoopGroup eventLoopGroup;
+
+   private volatile ChannelGroup serverChannelGroup;
+
+   private volatile ChannelGroup channelGroup;
+
+   private ServerBootstrap bootstrap;
+
+   private String host;
+
+   private boolean sasl;
+
+   // Constants.PORT is the default here
+   private int port;
+
+   public synchronized void start(String host, int port, final boolean sasl) 
throws Exception
+   {
+      this.host = host;
+      this.port = port;
+      this.sasl = sasl;
+
+      if (channelClazz != null)
+      {
+         // Already started
+         return;
+      }
+
+      int threadsToUse = Runtime.getRuntime().availableProcessors() * 3;
+      channelClazz = NioServerSocketChannel.class;
+      eventLoopGroup = new NioEventLoopGroup(threadsToUse, new 
SimpleServerThreadFactory("simple-server", true, 
Thread.currentThread().getContextClassLoader()));
+
+      bootstrap = new ServerBootstrap();
+      bootstrap.group(eventLoopGroup);
+      bootstrap.channel(channelClazz);
+
+
+      ChannelInitializer<Channel> factory = new ChannelInitializer<Channel>()
+      {
+         @Override
+         public void initChannel(Channel channel) throws Exception
+         {
+            ChannelPipeline pipeline = channel.pipeline();
+            pipeline.addLast("amqp-handler", new ProtocolDecoder());
+         }
+      };
+      bootstrap.childHandler(factory);
+
+      bootstrap.option(ChannelOption.SO_REUSEADDR, true).
+         childOption(ChannelOption.SO_REUSEADDR, true).
+         childOption(ChannelOption.SO_KEEPALIVE, true).
+//       childOption(ChannelOption.AUTO_READ, false).
+         childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+
+      channelGroup = new DefaultChannelGroup("hornetq-accepted-channels", 
GlobalEventExecutor.INSTANCE);
+
+      serverChannelGroup = new 
DefaultChannelGroup("hornetq-acceptor-channels", GlobalEventExecutor.INSTANCE);
+
+
+      SocketAddress address;
+      address = new InetSocketAddress(host, port);
+      Channel serverChannel = 
bootstrap.bind(address).syncUninterruptibly().channel();
+      serverChannelGroup.add(serverChannel);
+
+   }
+
+   class ProtocolDecoder extends ByteToMessageDecoder
+   {
+
+      AMQPServerConnectionContext connection;
+
+
+      public ProtocolDecoder()
+      {
+      }
+
+      @Override
+      public void channelActive(ChannelHandlerContext ctx) throws Exception
+      {
+         super.channelActive(ctx);
+         connection = 
ProtonServerConnectionContextFactory.getFactory().createConnection(new 
MinimalConnectionSPI(ctx.channel()));
+         //ctx.read();
+      }
+
+      @Override
+      protected void decode(final ChannelHandlerContext ctx, ByteBuf byteIn, 
List<Object> out) throws Exception
+      {
+         connection.inputBuffer(byteIn);
+         ctx.flush();
+//         if (connection.capacity() > 0)
+//         {
+//            ctx.read();
+//         }
+      }
+   }
+
+   public synchronized void stop()
+   {
+      serverChannelGroup.close().awaitUninterruptibly();
+      ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();
+   }
+
+
+   public static void main(String[] arg)
+   {
+      MinimalServer server = new MinimalServer();
+      try
+      {
+         server.start("127.0.0.1", Constants.PORT, true);
+
+
+         while (true)
+         {
+            Thread.sleep(360000000);
+         }
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
new file mode 100644
index 0000000..30a58bc
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.minimalserver;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.AMQPSessionContext;
+import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.context.server.ProtonServerSessionContext;
+import org.proton.plug.SASLResult;
+import org.proton.plug.util.ProtonServerMessage;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class MinimalSessionSPI implements AMQPSessionCallback
+{
+
+   private SASLResult result;
+   ProtonServerSessionContext session;
+
+   @Override
+   public void init(AMQPSessionContext session, SASLResult result)
+   {
+      this.session = (ProtonServerSessionContext) session;
+      this.result = result;
+   }
+
+   @Override
+   public void start()
+   {
+   }
+
+   static AtomicInteger tempQueueGenerator = new AtomicInteger(0);
+
+   public String tempQueueName()
+   {
+      return "TempQueueName" + tempQueueGenerator.incrementAndGet();
+   }
+
+   @Override
+   public Object createSender(ProtonPlugSender plugSender, String queue, 
String filer, boolean browserOnly)
+   {
+      Consumer consumer = new Consumer(DumbServer.getQueue(queue));
+      return consumer;
+   }
+
+   @Override
+   public void startSender(Object brokerConsumer)
+   {
+      ((Consumer) brokerConsumer).start();
+   }
+
+   @Override
+   public void createTemporaryQueue(String queueName)
+   {
+
+   }
+
+   @Override
+   public void onFlowConsumer(Object consumer, int credits)
+   {
+   }
+
+   @Override
+   public boolean queueQuery(String queueName)
+   {
+      return true;
+   }
+
+   @Override
+   public void closeSender(Object brokerConsumer)
+   {
+      ((Consumer) brokerConsumer).close();
+   }
+
+   @Override
+   public ProtonJMessage encodeMessage(Object message, int deliveryCount)
+   {
+      // We are storing internally as EncodedMessage on this minimalserver 
server
+      return (ProtonServerMessage) message;
+   }
+
+   @Override
+   public Binary getCurrentTXID()
+   {
+      return new Binary(new byte[]{1});
+   }
+
+   @Override
+   public void commitCurrentTX()
+   {
+   }
+
+   @Override
+   public void rollbackCurrentTX()
+   {
+   }
+
+   @Override
+   public void close()
+   {
+
+   }
+
+   @Override
+   public void ack(Object brokerConsumer, Object message)
+   {
+
+   }
+
+   @Override
+   public void cancel(Object brokerConsumer, Object message, boolean 
updateCounts)
+   {
+
+   }
+
+   @Override
+   public void resumeDelivery(Object consumer)
+   {
+      System.out.println("Resume delivery!!!");
+      ((Consumer) consumer).start();
+   }
+
+   @Override
+   public void serverSend(Receiver receiver, Delivery delivery, String 
address, int messageFormat, ByteBuf buffer)
+   {
+      ProtonServerMessage serverMessage = new ProtonServerMessage();
+      serverMessage.decode(buffer.nioBuffer());
+
+      BlockingDeque<Object> queue = DumbServer.getQueue(address);
+      queue.add(serverMessage);
+   }
+
+
+   class Consumer
+   {
+      final BlockingDeque<Object> queue;
+
+      Consumer(BlockingDeque<Object> queue)
+      {
+         this.queue = queue;
+      }
+
+      boolean running = false;
+      volatile Thread thread;
+
+      public void close()
+      {
+         System.out.println("Closing!!!");
+         running = false;
+         if (thread != null)
+         {
+            try
+            {
+               thread.join();
+            }
+            catch (Throwable ignored)
+            {
+            }
+         }
+
+         thread = null;
+      }
+
+      public synchronized void start()
+      {
+         running = true;
+         if (thread == null)
+         {
+            System.out.println("Start!!!");
+            thread = new Thread()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     while (running)
+                     {
+                        Object msg = queue.poll(1, TimeUnit.SECONDS);
+
+                        if (msg != null)
+                        {
+                           session.serverDelivery(msg, Consumer.this, 1);
+                        }
+                     }
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+                  }
+               }
+            };
+            thread.start();
+         }
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
new file mode 100644
index 0000000..d33c235
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.proton.plug.test.minimalserver;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A HornetQThreadFactory
+ *
+ * @author <a href="mailto:[email protected]";>Tim Fox</a>
+ */
+public final class SimpleServerThreadFactory implements ThreadFactory
+{
+   private final ThreadGroup group;
+
+   private final AtomicInteger threadCount = new AtomicInteger(0);
+
+   private final int threadPriority;
+
+   private final boolean daemon;
+
+   private final ClassLoader tccl;
+
+   public SimpleServerThreadFactory(final String groupName, final boolean 
daemon, final ClassLoader tccl)
+   {
+      group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
+
+      this.threadPriority = Thread.NORM_PRIORITY;
+
+      this.tccl = tccl;
+
+      this.daemon = daemon;
+   }
+
+   public Thread newThread(final Runnable command)
+   {
+      final Thread t;
+      // attach the thread to a group only if there is no security manager:
+      // when sandboxed, the code does not have the RuntimePermission 
modifyThreadGroup
+      if (System.getSecurityManager() == null)
+      {
+         t = new Thread(group, command, "Thread-" + 
threadCount.getAndIncrement() + " (" + group.getName() + ")");
+      }
+      else
+      {
+         t = new Thread(command, "Thread-" + threadCount.getAndIncrement());
+      }
+
+      AccessController.doPrivileged(new PrivilegedAction<Object>()
+      {
+         public Object run()
+         {
+            t.setDaemon(daemon);
+            t.setPriority(threadPriority);
+            return null;
+         }
+      });
+
+      try
+      {
+         AccessController.doPrivileged(new PrivilegedAction<Object>()
+         {
+            public Object run()
+            {
+               t.setContextClassLoader(tccl);
+               return null;
+            }
+         });
+      }
+      catch (java.security.AccessControlException e)
+      {
+         e.printStackTrace();
+      }
+
+      return t;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
new file mode 100644
index 0000000..be615c4
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.sasl;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.proton.plug.sasl.ClientSASLPlain;
+import org.proton.plug.sasl.PlainSASLResult;
+import org.proton.plug.sasl.ServerSASLPlain;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class PlainSASLTest
+{
+   @Test
+   public void testPlain()
+   {
+      ClientSASLPlain plainSASL = new ClientSASLPlain("user-me", 
"password-secret");
+      byte[] bytesResult = plainSASL.getBytes();
+
+      ServerSASLPlain serverSASLPlain = new ServerSASLPlain();
+      PlainSASLResult result = (PlainSASLResult) 
serverSASLPlain.processSASL(bytesResult);
+      Assert.assertEquals("user-me", result.getUser());
+      Assert.assertEquals("password-secret", result.getPassword());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
new file mode 100644
index 0000000..e1f0744
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.proton.plug.util.CreditsSemaphore;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class CreditsSemaphoreTest
+{
+   final CreditsSemaphore semaphore = new CreditsSemaphore(10);
+
+   final AtomicInteger errors = new AtomicInteger(0);
+
+   final AtomicInteger acquired = new AtomicInteger(0);
+
+   final CountDownLatch waiting = new CountDownLatch(1);
+
+   Thread thread = new Thread()
+   {
+      public void run()
+      {
+         try
+         {
+            for (int i = 0; i < 12; i++)
+            {
+               if (!semaphore.tryAcquire())
+               {
+                  waiting.countDown();
+                  semaphore.acquire();
+               }
+               acquired.incrementAndGet();
+            }
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+            errors.incrementAndGet();
+         }
+      }
+   };
+
+
+   @Test
+   public void testSetAndRelease() throws Exception
+   {
+      thread.start();
+
+      // 5 seconds would be an eternity here
+      Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, semaphore.getCredits());
+
+      long timeout = System.currentTimeMillis() + 1000;
+      while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < 
timeout)
+      {
+         Thread.sleep(10);
+      }
+
+      Assert.assertTrue(semaphore.hasQueuedThreads());
+
+      semaphore.setCredits(2);
+
+      thread.join();
+
+      Assert.assertEquals(12, acquired.get());
+
+      Assert.assertFalse(semaphore.hasQueuedThreads());
+   }
+
+   @Test
+   public void testDownAndUp() throws Exception
+   {
+      thread.start();
+
+      // 5 seconds would be an eternity here
+      Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, semaphore.getCredits());
+
+      long timeout = System.currentTimeMillis() + 1000;
+      while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < 
timeout)
+      {
+         Thread.sleep(10);
+      }
+
+      Assert.assertTrue(semaphore.hasQueuedThreads());
+
+      semaphore.release(2);
+
+      thread.join();
+
+      Assert.assertEquals(12, acquired.get());
+
+      Assert.assertFalse(semaphore.hasQueuedThreads());
+   }
+
+
+   @Test
+   public void testStartedZeroedSetLater() throws Exception
+   {
+      semaphore.setCredits(0);
+
+      thread.start();
+
+      // 5 seconds would be an eternity here
+      Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, semaphore.getCredits());
+
+      long timeout = System.currentTimeMillis() + 1000;
+      while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < 
timeout)
+      {
+         Thread.sleep(10);
+      }
+
+      Assert.assertTrue(semaphore.hasQueuedThreads());
+
+      Assert.assertEquals(0, acquired.get());
+
+      semaphore.setCredits(12);
+
+      thread.join();
+
+      Assert.assertEquals(12, acquired.get());
+
+      Assert.assertFalse(semaphore.hasQueuedThreads());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
new file mode 100644
index 0000000..5d44c88
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.util;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.proton.plug.util.ReusableLatch;
+
+/**
+ * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
+ */
+public class ReusableLatchTest
+{
+   @Test
+   public void testLatchWithParameterizedDown() throws Exception
+   {
+      ReusableLatch latch = new ReusableLatch(1000);
+
+      latch.countDown(5000);
+
+      Assert.assertTrue(latch.await(1000));
+
+
+      Assert.assertEquals(0, latch.getCount());
+   }
+
+   @Test
+   public void testLatchOnSingleThread() throws Exception
+   {
+      ReusableLatch latch = new ReusableLatch();
+
+      for (int i = 1; i <= 100; i++)
+      {
+         latch.countUp();
+         Assert.assertEquals(i, latch.getCount());
+      }
+
+      for (int i = 100; i > 0; i--)
+      {
+         Assert.assertEquals(i, latch.getCount());
+         latch.countDown();
+         Assert.assertEquals(i - 1, latch.getCount());
+      }
+
+      latch.await();
+   }
+
+   /**
+    * This test will open numberOfThreads threads, and add numberOfAdds on the
+    * VariableLatch After those addthreads are finished, the latch count should
+    * be numberOfThreads * numberOfAdds Then it will open numberOfThreads
+    * threads again releasing numberOfAdds on the VariableLatch After those
+    * releaseThreads are finished, the latch count should be 0 And all the
+    * waiting threads should be finished also
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testLatchOnMultiThread() throws Exception
+   {
+      final ReusableLatch latch = new ReusableLatch();
+
+      latch.countUp(); // We hold at least one, so ThreadWaits won't go away
+
+      final int numberOfThreads = 100;
+      final int numberOfAdds = 100;
+
+      class ThreadWait extends Thread
+      {
+         private volatile boolean waiting = true;
+
+         @Override
+         public void run()
+         {
+            try
+            {
+               if (!latch.await(5000))
+               {
+                  System.err.println("Latch timed out");
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+            waiting = false;
+         }
+      }
+
+      class ThreadAdd extends Thread
+      {
+         private final CountDownLatch latchReady;
+
+         private final CountDownLatch latchStart;
+
+         ThreadAdd(final CountDownLatch latchReady, final CountDownLatch 
latchStart)
+         {
+            this.latchReady = latchReady;
+            this.latchStart = latchStart;
+         }
+
+         @Override
+         public void run()
+         {
+            try
+            {
+               latchReady.countDown();
+               // Everybody should start at the same time, to worse concurrency
+               // effects
+               latchStart.await();
+               for (int i = 0; i < numberOfAdds; i++)
+               {
+                  latch.countUp();
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+
+      CountDownLatch latchReady = new CountDownLatch(numberOfThreads);
+      CountDownLatch latchStart = new CountDownLatch(1);
+
+      ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads];
+      ThreadWait[] waits = new ThreadWait[numberOfThreads];
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         threadAdds[i] = new ThreadAdd(latchReady, latchStart);
+         threadAdds[i].start();
+         waits[i] = new ThreadWait();
+         waits[i].start();
+      }
+
+      latchReady.await();
+      latchStart.countDown();
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         threadAdds[i].join();
+      }
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         Assert.assertTrue(waits[i].waiting);
+      }
+
+      Assert.assertEquals(numberOfThreads * numberOfAdds + 1, 
latch.getCount());
+
+      class ThreadDown extends Thread
+      {
+         private final CountDownLatch latchReady;
+
+         private final CountDownLatch latchStart;
+
+         ThreadDown(final CountDownLatch latchReady, final CountDownLatch 
latchStart)
+         {
+            this.latchReady = latchReady;
+            this.latchStart = latchStart;
+         }
+
+         @Override
+         public void run()
+         {
+            try
+            {
+               latchReady.countDown();
+               // Everybody should start at the same time, to worse concurrency
+               // effects
+               latchStart.await();
+               for (int i = 0; i < numberOfAdds; i++)
+               {
+                  latch.countDown();
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+
+      latchReady = new CountDownLatch(numberOfThreads);
+      latchStart = new CountDownLatch(1);
+
+      ThreadDown[] down = new ThreadDown[numberOfThreads];
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         down[i] = new ThreadDown(latchReady, latchStart);
+         down[i].start();
+      }
+
+      latchReady.await();
+      latchStart.countDown();
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         down[i].join();
+      }
+
+      Assert.assertEquals(1, latch.getCount());
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         Assert.assertTrue(waits[i].waiting);
+      }
+
+      latch.countDown();
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         waits[i].join();
+      }
+
+      Assert.assertEquals(0, latch.getCount());
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         Assert.assertFalse(waits[i].waiting);
+      }
+   }
+
+   @Test
+   public void testReuseLatch() throws Exception
+   {
+      final ReusableLatch latch = new ReusableLatch(5);
+      for (int i = 0; i < 5; i++)
+      {
+         latch.countDown();
+      }
+
+      latch.countUp();
+
+      class ThreadWait extends Thread
+      {
+         private volatile boolean waiting = false;
+
+         private volatile Exception e;
+
+         private final CountDownLatch readyLatch = new CountDownLatch(1);
+
+         @Override
+         public void run()
+         {
+            waiting = true;
+            readyLatch.countDown();
+            try
+            {
+               if (!latch.await(1000))
+               {
+                  System.err.println("Latch timed out!");
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+               this.e = e;
+            }
+            waiting = false;
+         }
+      }
+
+      ThreadWait t = new ThreadWait();
+      t.start();
+
+      t.readyLatch.await();
+
+      Assert.assertEquals(true, t.waiting);
+
+      latch.countDown();
+
+      t.join();
+
+      Assert.assertEquals(false, t.waiting);
+
+      Assert.assertNull(t.e);
+
+      latch.countUp();
+
+      t = new ThreadWait();
+      t.start();
+
+      t.readyLatch.await();
+
+      Assert.assertEquals(true, t.waiting);
+
+      latch.countDown();
+
+      t.join();
+
+      Assert.assertEquals(false, t.waiting);
+
+      Assert.assertNull(t.e);
+
+      Assert.assertTrue(latch.await(1000));
+
+      Assert.assertEquals(0, latch.getCount());
+
+      latch.countDown();
+
+      Assert.assertEquals(0, latch.getCount());
+
+   }
+
+   @Test
+   public void testTimeout() throws Exception
+   {
+      ReusableLatch latch = new ReusableLatch();
+
+      latch.countUp();
+
+      long start = System.currentTimeMillis();
+      Assert.assertFalse(latch.await(1000));
+      long end = System.currentTimeMillis();
+
+      Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 
&& end - start < 2000);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
new file mode 100644
index 0000000..5832b1e
--- /dev/null
+++ 
b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.util;
+
+import org.proton.plug.test.AbstractJMSTest;
+import org.proton.plug.test.Constants;
+import org.proton.plug.test.invm.InVMTestConnector;
+import org.proton.plug.test.minimalclient.Connector;
+import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
+import org.proton.plug.test.minimalserver.DumbServer;
+import org.proton.plug.test.minimalserver.MinimalServer;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class SimpleServerAbstractTest
+{
+
+   protected final boolean useSASL;
+   protected final boolean useInVM;
+   protected MinimalServer server = new MinimalServer();
+
+   public SimpleServerAbstractTest(boolean useSASL, boolean useInVM)
+   {
+      this.useSASL = useSASL;
+      this.useInVM = useInVM;
+   }
+
+   @Before
+   public void setUp() throws Exception
+   {
+      DumbServer.clear();
+      AbstractJMSTest.forceGC();
+      if (!useInVM)
+      {
+         server.start("127.0.0.1", Constants.PORT, useSASL);
+      }
+
+
+   }
+
+   @After
+   public void tearDown() throws Exception
+   {
+      if (!useInVM)
+      {
+         server.stop();
+      }
+      DumbServer.clear();
+   }
+
+   protected Connector newConnector()
+   {
+      if (useInVM)
+      {
+         return new InVMTestConnector();
+      }
+      else
+      {
+         return new SimpleAMQPConnector();
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
index 8321533..d515fa3 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
@@ -100,6 +100,8 @@ public class StompDecoder
 
    public static final byte C = (byte) 'C';
 
+   public static final byte c = (byte) 'c';
+
    public static final byte D = (byte) 'D';
 
    public static final byte E = (byte) 'E';

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
index e4d2637..a0d26c1 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
@@ -172,18 +172,18 @@ public class StompFrame
          this.val = val;
       }
 
-      public String getEscapedKey()
+      public String getEncodedKey()
       {
-         return escape(key);
+         return encode(key);
       }
 
-      public String getEscapedValue()
+      public String getEncodedValue()
       {
-         return escape(val);
+         return encode(val);
       }
    }
 
-   public String escape(String str)
+   public String encode(String str)
    {
       int len = str.length();
 
@@ -192,21 +192,35 @@ public class StompFrame
       for (int i = 0; i < len; i++)
       {
          char c = str.charAt(i);
-         if (c == '\n')
+
+         // \n
+         if (c == (byte) 10)
          {
-            buffer[iBuffer++] = '\\';
-            buffer[iBuffer] = 'n';
+            buffer[iBuffer] = (byte) 92;
+            buffer[++iBuffer] = (byte) 110;
          }
-         else if (c == '\\')
+
+         // \r
+         else if (c == (byte) 13)
          {
-            buffer[iBuffer++] = '\\';
-            buffer[iBuffer] = '\\';
+            buffer[iBuffer] = (byte) 92;
+            buffer[++iBuffer] = (byte) 114;
          }
-         else if (c == ':')
+
+         // \
+         else if (c == (byte) 92)
          {
-            buffer[iBuffer++] = '\\';
-            buffer[iBuffer] = ':';
+            buffer[iBuffer] = (byte) 92;
+            buffer[++iBuffer] = (byte) 92;
          }
+
+         // :
+         else if (c == (byte) 58)
+         {
+            buffer[iBuffer] = (byte) 92;
+            buffer[++iBuffer] = (byte) 99;
+         }
+
          else
          {
             buffer[iBuffer] = c;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
index a5e62fe..a8e6f3e 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
@@ -28,8 +28,8 @@ import org.hornetq.api.core.HornetQExceptionType;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.remoting.impl.netty.NettyServerConnection;
@@ -42,6 +42,7 @@ import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationListener;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.MessageConverter;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Acceptor;
@@ -92,6 +93,12 @@ class StompProtocolManager implements ProtocolManager, 
NotificationListener
       }
    }
 
+   @Override
+   public MessageConverter getConverter()
+   {
+      return null;
+   }
+
    // ProtocolManager implementation --------------------------------
 
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, 
final Connection connection)
@@ -236,7 +243,7 @@ class StompProtocolManager implements ProtocolManager, 
NotificationListener
                                                       false,
                                                       false,
                                                       null,
-                                                      stompSession);
+                                                      stompSession, null);
          stompSession.setServerSession(session);
          sessions.put(connection.getID(), stompSession);
       }
@@ -261,7 +268,7 @@ class StompProtocolManager implements ProtocolManager, 
NotificationListener
                                                       false,
                                                       false,
                                                       null,
-                                                      stompSession);
+                                                      stompSession, null);
          stompSession.setServerSession(session);
          transactedSessions.put(txID, stompSession);
       }
@@ -365,7 +372,7 @@ class StompProtocolManager implements ProtocolManager, 
NotificationListener
 
    public ServerMessageImpl createServerMessage()
    {
-      return new 
ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+      return new ServerMessageImpl(server.getStorageManager().generateID(), 
512);
    }
 
    public void commitTransaction(StompConnection connection, String txID) 
throws Exception
@@ -402,7 +409,7 @@ class StompProtocolManager implements ProtocolManager, 
NotificationListener
          throw new HornetQStompException("There already is a subscription for: 
" + subscriptionID +
                                             ". Either use unique subscription 
IDs or do not create multiple subscriptions for the same destination");
       }
-      long consumerID = server.getStorageManager().generateUniqueID();
+      long consumerID = server.getStorageManager().generateID();
       String clientID = (connection.getClientID() != null) ? 
connection.getClientID() : null;
       stompSession.addSubscription(consumerID,
                                    subscriptionID,
@@ -450,7 +457,10 @@ class StompProtocolManager implements ProtocolManager, 
NotificationListener
    @Override
    public void onNotification(Notification notification)
    {
-      NotificationType type = notification.getType();
+      if (!(notification.getType() instanceof CoreNotificationType)) return;
+
+      CoreNotificationType type = (CoreNotificationType) 
notification.getType();
+
       TypedProperties props = notification.getProperties();
 
       switch (type)

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
index 9109541..afc8b28 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
@@ -30,6 +30,7 @@ import 
org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.QueueQueryResult;
+import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -86,6 +87,12 @@ public class StompSession implements SessionCallback
       return session;
    }
 
+   @Override
+   public boolean hasCredits(ServerConsumer consumerID)
+   {
+      return true;
+   }
+
    public void sendProducerCreditsMessage(int credits, SimpleString address)
    {
    }
@@ -94,13 +101,13 @@ public class StompSession implements SessionCallback
    {
    }
 
-   public int sendMessage(ServerMessage serverMessage, long consumerID, int 
deliveryCount)
+   public int sendMessage(ServerMessage serverMessage, ServerConsumer 
consumer, int deliveryCount)
    {
       LargeServerMessageImpl largeMessage = null;
       ServerMessage newServerMessage = serverMessage;
       try
       {
-         StompSubscription subscription = subscriptions.get(consumerID);
+         StompSubscription subscription = subscriptions.get(consumer.getID());
          StompFrame frame = null;
          if (serverMessage.isLargeMessage())
          {
@@ -146,13 +153,13 @@ public class StompSession implements SessionCallback
             if (manager.send(connection, frame))
             {
                //we ack and commit only if the send is successful
-               session.acknowledge(consumerID, 
newServerMessage.getMessageID());
+               session.acknowledge(consumer.getID(), 
newServerMessage.getMessageID());
                session.commit();
             }
          }
          else
          {
-            messagesToAck.put(newServerMessage.getMessageID(), new Pair<Long, 
Integer>(consumerID, length));
+            messagesToAck.put(newServerMessage.getMessageID(), new Pair<Long, 
Integer>(consumer.getID(), length));
             // Must send AFTER adding to messagesToAck - or could get acked 
from client BEFORE it's been added!
             manager.send(connection, frame);
          }
@@ -174,12 +181,12 @@ public class StompSession implements SessionCallback
 
    }
 
-   public int sendLargeMessageContinuation(long consumerID, byte[] body, 
boolean continues, boolean requiresResponse)
+   public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] 
body, boolean continues, boolean requiresResponse)
    {
       return 0;
    }
 
-   public int sendLargeMessage(ServerMessage msg, long consumerID, long 
bodySize, int deliveryCount)
+   public int sendLargeMessage(ServerMessage msg, ServerConsumer consumer, 
long bodySize, int deliveryCount)
    {
       return 0;
    }
@@ -199,9 +206,9 @@ public class StompSession implements SessionCallback
    }
 
    @Override
-   public void disconnect(long consumerId, String queueName)
+   public void disconnect(ServerConsumer consumerId, String queueName)
    {
-      StompSubscription stompSubscription = subscriptions.remove(consumerId);
+      StompSubscription stompSubscription = 
subscriptions.remove(consumerId.getID());
       if (stompSubscription != null)
       {
          StompFrame frame = 
connection.getFrameHandler().createStompFrame(StompCommands.ERROR.toString());
@@ -380,7 +387,7 @@ public class StompSession implements SessionCallback
       }
 
       StorageManager storageManager = ((ServerSessionImpl) 
session).getStorageManager();
-      long id = storageManager.generateUniqueID();
+      long id = storageManager.generateID();
       LargeServerMessage largeMessage = storageManager.createLargeMessage(id, 
message);
 
       byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - 
MessageImpl.BODY_OFFSET];

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 483924d..07b9107 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -730,24 +730,15 @@ public class StompFrameHandlerV11 extends 
VersionedStompFrameHandler implements
                }
                case HEADER_SEPARATOR:
                {
-                  if (isEscaping)
+                  if (inHeaderName)
                   {
-                     //a colon
-                     holder.append(b);
-                     isEscaping = false;
-                  }
-                  else
-                  {
-                     if (inHeaderName)
-                     {
-                        headerName = holder.getString();
+                     headerName = holder.getString();
 
-                        holder.reset();
+                     holder.reset();
 
-                        inHeaderName = false;
+                     inHeaderName = false;
 
-                        headerValueWhitespace = true;
-                     }
+                     headerValueWhitespace = true;
                   }
 
                   whiteSpaceOnly = false;
@@ -767,6 +758,19 @@ public class StompFrameHandlerV11 extends 
VersionedStompFrameHandler implements
                   }
                   break;
                }
+               case StompDecoder.c:
+               {
+                  if (isEscaping)
+                  {
+                     holder.append(StompDecoder.HEADER_SEPARATOR);
+                     isEscaping = false;
+                  }
+                  else
+                  {
+                     holder.append(b);
+                  }
+                  break;
+               }
                case StompDecoder.NEW_LINE:
                {
                   if (whiteSpaceOnly)

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
index 9bc5b10..2cfc426 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
@@ -62,9 +62,9 @@ public class StompFrameV11 extends StompFrame
          // Output the headers.
          for (Header h : allHeaders)
          {
-            head.append(h.getEscapedKey());
+            head.append(h.getEncodedKey());
             head.append(Stomp.Headers.SEPARATOR);
-            head.append(h.getEscapedValue());
+            head.append(h.getEncodedValue());
             head.append(Stomp.NEWLINE);
          }
          // Add a newline to separate the headers from the content.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 36a5dbe..3ca1701 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -166,24 +166,15 @@ public class StompFrameHandlerV12 extends 
StompFrameHandlerV11 implements FrameE
                }
                case HEADER_SEPARATOR:
                {
-                  if (isEscaping)
+                  if (inHeaderName)
                   {
-                     //a colon
-                     holder.append(b);
-                     isEscaping = false;
-                  }
-                  else
-                  {
-                     if (inHeaderName)
-                     {
-                        headerName = holder.getString();
+                     headerName = holder.getString();
 
-                        holder.reset();
+                     holder.reset();
 
-                        inHeaderName = false;
+                     inHeaderName = false;
 
-                        headerValueWhitespace = true;
-                     }
+                     headerValueWhitespace = true;
                   }
 
                   whiteSpaceOnly = false;
@@ -225,6 +216,19 @@ public class StompFrameHandlerV12 extends 
StompFrameHandlerV11 implements FrameE
                   nextEOLChar = true;
                   break;
                }
+               case StompDecoder.c:
+               {
+                  if (isEscaping)
+                  {
+                     holder.append(StompDecoder.HEADER_SEPARATOR);
+                     isEscaping = false;
+                  }
+                  else
+                  {
+                     holder.append(b);
+                  }
+                  break;
+               }
                case NEW_LINE:
                {
                   nextEOLChar = false;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java
----------------------------------------------------------------------
diff --git 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java
 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java
index 4dafa0c..2be74f1 100644
--- 
a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java
+++ 
b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java
@@ -62,9 +62,9 @@ public class StompFrameV12 extends StompFrame
          // Output the headers.
          for (Header h : allHeaders)
          {
-            head.append(h.getEscapedKey());
+            head.append(h.getEncodedKey());
             head.append(Stomp.Headers.SEPARATOR);
-            head.append(h.getEscapedValue());
+            head.append(h.getEncodedValue());
             head.append(Stomp.NEWLINE);
          }
 
@@ -104,49 +104,4 @@ public class StompFrameV12 extends StompFrame
          allHeaders.add(new Header(key, val));
       }
    }
-
-   @Override
-   public String escape(String str)
-   {
-      int len = str.length();
-
-      char[] buffer = new char[2 * len];
-      int iBuffer = 0;
-      for (int i = 0; i < len; i++)
-      {
-         char c = str.charAt(i);
-         if (c == '\r')
-         {
-            buffer[iBuffer++] = '\\';
-            buffer[iBuffer] = 'r';
-         }
-         else if (c == '\n')
-         {
-            buffer[iBuffer++] = '\\';
-            buffer[iBuffer] = 'n';
-         }
-         else if (c == '\\')
-         {
-            buffer[iBuffer++] = '\\';
-            buffer[iBuffer] = '\\';
-         }
-         else if (c == ':')
-         {
-            buffer[iBuffer++] = '\\';
-            buffer[iBuffer] = ':';
-         }
-         else
-         {
-            buffer[iBuffer] = c;
-         }
-         iBuffer++;
-      }
-
-      char[] total = new char[iBuffer];
-      System.arraycopy(buffer, 0, total, 0, iBuffer);
-
-      return new String(total);
-   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/pom.xml
----------------------------------------------------------------------
diff --git a/hornetq-protocols/pom.xml b/hornetq-protocols/pom.xml
index 54e7012..1ecde53 100644
--- a/hornetq-protocols/pom.xml
+++ b/hornetq-protocols/pom.xml
@@ -19,6 +19,8 @@
    <modules>
       <module>hornetq-amqp-protocol</module>
       <module>hornetq-stomp-protocol</module>
+      <module>hornetq-openwire-protocol</module>
+      <module>hornetq-proton-plug</module>
    </modules>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/pom.xml
----------------------------------------------------------------------
diff --git a/hornetq-ra/pom.xml b/hornetq-ra/pom.xml
index fc9ad51..561af3e 100644
--- a/hornetq-ra/pom.xml
+++ b/hornetq-ra/pom.xml
@@ -52,6 +52,21 @@
          <artifactId>jboss-jca-api</artifactId>
          <scope>provided</scope>
       </dependency>
+      <dependency>
+         <groupId>org.jboss.ironjacamar</groupId>
+         <artifactId>ironjacamar-core-api</artifactId>
+         <scope>provided</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.spec.javax.transaction</groupId>
+         <artifactId>jboss-transaction-api_1.2_spec</artifactId>
+         <scope>provided</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss</groupId>
+         <artifactId>jboss-transaction-spi</artifactId>
+         <scope>provided</scope>
+      </dependency>
    </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java
----------------------------------------------------------------------
diff --git 
a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java
index b51c388..055b3e1 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.jms.client.HornetQConnection;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.client.HornetQXAConnection;
@@ -51,6 +52,7 @@ import org.hornetq.jms.client.HornetQXAConnection;
  *
  * @author <a href="mailto:[email protected]";>Adrian Brock</a>
  * @author <a href="mailto:[email protected]";>Jesper Pedersen</a>
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
  */
 public final class HornetQRAManagedConnection implements ManagedConnection, 
ExceptionListener
 {
@@ -292,7 +294,7 @@ public final class HornetQRAManagedConnection implements 
ManagedConnection, Exce
       try
       {
          /**
-          * (xa|nonXA)Session.close() may NOT be called BEFORE 
connection.close().
+          * (xa|nonXA)Session.close() may NOT be called BEFORE 
connection.close()
           * <p>
           * If the ClientSessionFactory is trying to fail-over or reconnect 
with -1 attempts, and
           * one calls session.close() it may effectively dead-lock.
@@ -522,12 +524,14 @@ public final class HornetQRAManagedConnection implements 
ManagedConnection, Exce
       }
 
       //
-      // Spec says a mc must allways return the same XA resource,
+      // Spec says a mc must always return the same XA resource,
       // so we cache it.
       //
       if (xaResource == null)
       {
-         xaResource = new HornetQRAXAResource(this, xaSession.getXAResource());
+         ClientSessionInternal csi = (ClientSessionInternal) 
xaSession.getXAResource();
+         HornetQRAXAResource hqXAResource = new HornetQRAXAResource(this, 
xaSession.getXAResource());
+         xaResource = new HornetQXAResourceWrapper(hqXAResource, 
ra.getJndiName(), csi.getNodeId());
       }
 
       if (HornetQRAManagedConnection.trace)

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java
----------------------------------------------------------------------
diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java
index d03f602..e4ace2b 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java
@@ -49,12 +49,16 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.jms.client.HornetQSession;
+
 
 /**
  * A joint interface for JMS sessions
  *
  * @author <a href="mailto:[email protected]";>Adrian Brock</a>
  * @author <a href="mailto:[email protected]";>Jesper Pedersen</a>
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
  */
 public final class HornetQRASession implements QueueSession, TopicSession, 
XAQueueSession, XATopicSession
 {
@@ -1273,6 +1277,18 @@ public final class HornetQRASession implements 
QueueSession, TopicSession, XAQue
    }
 
    /**
+    * Returns the ID of the Node that this session is associated with.
+    *
+    * @return Node ID
+    */
+   public String getNodeId() throws JMSException
+   {
+      HornetQSession session = (HornetQSession) getSessionInternal();
+      ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) 
session.getCoreSession().getSessionFactory();
+      return factory.getLiveNodeId();
+   }
+
+   /**
     * Get the session
     *
     * @return The session

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java
index 80493bc..58a88a4 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java
@@ -923,10 +923,10 @@ public final class HornetQRASessionFactoryImpl extends 
HornetQConnectionForConte
                      break;
                   //The value {@code Session.CLIENT_ACKNOWLEDGE} may not be 
used.
                   case Session.CLIENT_ACKNOWLEDGE:
-                     throw 
HornetQRABundle.BUNDLE.invalidSessionTransactedModeRuntime();
+                     throw 
HornetQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
                      //same with this although the spec doesn't explicitly say
                   case Session.SESSION_TRANSACTED:
-                     throw 
HornetQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
+                     throw 
HornetQRABundle.BUNDLE.invalidSessionTransactedModeRuntime();
                   default:
                      throw 
HornetQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode);
                }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java
----------------------------------------------------------------------
diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java
index 31cbe75..053b4f2 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java
@@ -18,6 +18,7 @@ import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.HornetQXAResource;
 
 /**
  * HornetQXAResource.
@@ -25,7 +26,7 @@ import org.hornetq.core.client.impl.ClientSessionInternal;
  * @author <a href="[email protected]">Adrian Brock</a>
  * @author <a href="[email protected]">Jesper Pedersen</a>
  */
-public class HornetQRAXAResource implements XAResource
+public class HornetQRAXAResource implements HornetQXAResource
 {
    /** Trace enabled */
    private static boolean trace = HornetQRALogger.LOGGER.isTraceEnabled();
@@ -247,4 +248,10 @@ public class HornetQRAXAResource implements XAResource
 
       return xaResource.setTransactionTimeout(seconds);
    }
+
+   @Override
+   public XAResource getResource()
+   {
+      return xaResource;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
----------------------------------------------------------------------
diff --git 
a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
index c6399ac..d72ee44 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.hornetq.api.core.BroadcastEndpointFactoryConfiguration;
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -56,12 +58,18 @@ import org.jgroups.JChannel;
  * @author <a href="[email protected]">Adrian Brock</a>
  * @author <a href="[email protected]">Jesper Pedersen</a>
  * @author <a href="mailto:[email protected]";>Andy Taylor</a>
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
  */
 public class HornetQResourceAdapter implements ResourceAdapter, Serializable
 {
    private static final long serialVersionUID = 4756893709825838770L;
 
    /**
+    * The Name of the product that this resource adapter represents.
+    */
+   public static final String PRODUCT_NAME = "HornetQ";
+
+   /**
     * Trace enabled
     */
    private static boolean trace = HornetQRALogger.LOGGER.isTraceEnabled();
@@ -111,6 +119,8 @@ public class HornetQResourceAdapter implements 
ResourceAdapter, Serializable
 
    private final List<HornetQRAManagedConnectionFactory> 
managedConnectionFactories = new ArrayList<HornetQRAManagedConnectionFactory>();
 
+   private String entries;
+
    /**
     * Constructor
     */
@@ -371,6 +381,29 @@ public class HornetQResourceAdapter implements 
ResourceAdapter, Serializable
       this.raProperties.setHA(ha);
    }
 
+   public String getEntries()
+   {
+      return entries;
+   }
+
+   public String getJndiName()
+   {
+      if (!(entries == null || entries.isEmpty()))
+      {
+         Matcher m = Pattern.compile("\"(.*?)\"").matcher(entries);
+         if (m.find())
+         {
+            return m.group(1);
+         }
+      }
+      return null;
+   }
+
+   public void setEntries(String entries)
+   {
+      this.entries = entries;
+   }
+
    /**
     * Get the discovery group name
     *
@@ -1890,7 +1923,11 @@ public class HornetQResourceAdapter implements 
ResourceAdapter, Serializable
 
             String localBindAddress = 
overrideProperties.getDiscoveryLocalBindAddress() != null ? 
overrideProperties.getDiscoveryLocalBindAddress()
                : raProperties.getDiscoveryLocalBindAddress();
-            endpointFactoryConfiguration = new 
UDPBroadcastGroupConfiguration(discoveryAddress, discoveryPort, 
localBindAddress, -1);
+            endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
+               .setGroupAddress(discoveryAddress)
+               .setGroupPort(discoveryPort)
+               .setLocalBindAddress(localBindAddress)
+               .setLocalBindPort(-1);
          }
          else if (jgroupsFileName != null)
          {
@@ -1911,7 +1948,10 @@ public class HornetQResourceAdapter implements 
ResourceAdapter, Serializable
             initialTimeout = 
HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
          }
 
-         DiscoveryGroupConfiguration groupConfiguration = new 
DiscoveryGroupConfiguration(refreshTimeout, initialTimeout, 
endpointFactoryConfiguration);
+         DiscoveryGroupConfiguration groupConfiguration = new 
DiscoveryGroupConfiguration()
+            .setRefreshTimeout(refreshTimeout)
+            .setDiscoveryInitialWaitTimeout(initialTimeout)
+            
.setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration);
 
          if (HornetQRALogger.LOGGER.isDebugEnabled())
          {
@@ -2011,7 +2051,11 @@ public class HornetQResourceAdapter implements 
ResourceAdapter, Serializable
 
             String localBindAddress = 
overrideProperties.getDiscoveryLocalBindAddress() != null ? 
overrideProperties.getDiscoveryLocalBindAddress()
                : raProperties.getDiscoveryLocalBindAddress();
-            endpointFactoryConfiguration = new 
UDPBroadcastGroupConfiguration(discoveryAddress, discoveryPort, 
localBindAddress, -1);
+            endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
+               .setGroupAddress(discoveryAddress)
+               .setGroupPort(discoveryPort)
+               .setLocalBindAddress(localBindAddress)
+               .setLocalBindPort(-1);
          }
          else if (jgroupsFileName != null)
          {
@@ -2045,7 +2089,10 @@ public class HornetQResourceAdapter implements 
ResourceAdapter, Serializable
          {
             initialTimeout = 
HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
          }
-         DiscoveryGroupConfiguration groupConfiguration = new 
DiscoveryGroupConfiguration(refreshTimeout, initialTimeout, 
endpointFactoryConfiguration);
+         DiscoveryGroupConfiguration groupConfiguration = new 
DiscoveryGroupConfiguration()
+            .setRefreshTimeout(refreshTimeout)
+            .setDiscoveryInitialWaitTimeout(initialTimeout)
+            
.setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration);
 
          groupConfiguration.setRefreshTimeout(refreshTimeout);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java
new file mode 100644
index 0000000..043f35b
--- /dev/null
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.ra;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.client.impl.HornetQXAResource;
+import org.hornetq.utils.VersionLoader;
+
+/**
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
+ *
+ * Wraps XAResource with org.jboss.tm.XAResourceWrapper.  This adds extra 
meta-data to to the XAResource used by
+ * Transaction Manager for recovery scenarios.
+ */
+
+public class HornetQXAResourceWrapper implements 
org.jboss.tm.XAResourceWrapper, 
org.jboss.jca.core.spi.transaction.xa.XAResourceWrapper, HornetQXAResource
+{
+   private final XAResource xaResource;
+
+   // The EIS Name
+   private final String productName;
+
+   // The EIS Version
+   private final String productVersion;
+
+   // A composite of NodeID + JNDIName that allows adminstrator looking at an 
XAResource to determine it's origin.
+   private final String jndiNameNodeId;
+
+   /**
+    * Creates a new XAResourceWrapper.  PRODUCT_NAME, productVersion and 
jndiName are useful for log output in the
+    * Transaction Manager.  For HornetQ only the resourceManagerID is required 
to allow Transaction Manager to recover
+    * from relevant recovery scenarios.
+    *
+    * @param xaResource
+    * @param jndiName
+    */
+   public HornetQXAResourceWrapper(XAResource xaResource, String jndiName, 
String nodeId)
+   {
+      this.xaResource = xaResource;
+      this.productName = HornetQResourceAdapter.PRODUCT_NAME;
+      this.productVersion = VersionLoader.getVersion().getFullVersion();
+      this.jndiNameNodeId = jndiName + " NodeId:" + nodeId;
+   }
+
+   @Override
+   public XAResource getResource()
+   {
+      return xaResource;
+   }
+
+   @Override
+   public String getProductName()
+   {
+      return productName;
+   }
+
+   @Override
+   public String getProductVersion()
+   {
+      return productVersion;
+   }
+
+   @Override
+   public String getJndiName()
+   {
+      return jndiNameNodeId;
+   }
+
+   @Override
+   public void commit(Xid xid, boolean b) throws XAException
+   {
+      getResource().commit(xid, b);
+   }
+
+   @Override
+   public void end(Xid xid, int i) throws XAException
+   {
+      getResource().end(xid, i);
+   }
+
+   @Override
+   public void forget(Xid xid) throws XAException
+   {
+      getResource().forget(xid);
+   }
+
+   @Override
+   public int getTransactionTimeout() throws XAException
+   {
+      return getResource().getTransactionTimeout();
+   }
+
+   @Override
+   public boolean isSameRM(XAResource xaResource) throws XAException
+   {
+      return getResource().isSameRM(xaResource);
+   }
+
+   @Override
+   public int prepare(Xid xid) throws XAException
+   {
+      return getResource().prepare(xid);
+   }
+
+   @Override
+   public Xid[] recover(int i) throws XAException
+   {
+      return getResource().recover(i);
+   }
+
+   @Override
+   public void rollback(Xid xid) throws XAException
+   {
+      getResource().rollback(xid);
+   }
+
+   @Override
+   public boolean setTransactionTimeout(int i) throws XAException
+   {
+      return getResource().setTransactionTimeout(i);
+   }
+
+   @Override
+   public void start(Xid xid, int i) throws XAException
+   {
+      getResource().start(xid, i);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
----------------------------------------------------------------------
diff --git 
a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
index d0a17a4..d75be85 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java
@@ -46,6 +46,7 @@ import org.hornetq.ra.HornetQRAConnectionFactory;
 import org.hornetq.ra.HornetQRALogger;
 import org.hornetq.ra.HornetQRaUtils;
 import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.utils.FutureLatch;
 import org.hornetq.utils.SensitiveDataCodec;
 
 /**
@@ -401,9 +402,33 @@ public class HornetQActivation
 
       handlers.clear();
 
+      FutureLatch future = new FutureLatch(handlersCopy.length);
+      List<Thread> interruptThreads = new ArrayList<>();
       for (HornetQMessageHandler handler : handlersCopy)
       {
-         handler.interruptConsumer();
+         Thread thread = handler.interruptConsumer(future);
+         if (thread != null)
+         {
+            interruptThreads.add(thread);
+         }
+      }
+
+      //wait for all the consumers to complete any onmessage calls
+      boolean stuckThreads = !future.await(factory.getCallTimeout());
+      //if any are stuck then we need to interrupt them
+      if (stuckThreads)
+      {
+         for (Thread interruptThread : interruptThreads)
+         {
+            try
+            {
+               interruptThread.interrupt();
+            }
+            catch (Exception e)
+            {
+               //ok
+            }
+         }
       }
 
       Thread threadTearDown = new Thread("TearDown/HornetQActivation")

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java
----------------------------------------------------------------------
diff --git 
a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java
index ed40b94..f9ce2f2 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java
@@ -12,13 +12,20 @@
  */
 package org.hornetq.ra.inflow;
 
+import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.Topic;
 import javax.resource.ResourceException;
 import javax.resource.spi.ActivationSpec;
 import javax.resource.spi.InvalidPropertyException;
 import javax.resource.spi.ResourceAdapter;
+import java.beans.IntrospectionException;
+import java.beans.PropertyDescriptor;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
 
 import org.hornetq.ra.ConnectionFactoryProperties;
 import org.hornetq.ra.HornetQRALogger;
@@ -773,9 +780,47 @@ public class HornetQActivationSpec extends 
ConnectionFactoryProperties implement
          HornetQRALogger.LOGGER.trace("validate()");
       }
 
-      if (destination == null || destination.trim().equals(""))
+      List<String> errorMessages = new ArrayList<String>();
+      List<PropertyDescriptor> propsNotSet = new 
ArrayList<PropertyDescriptor>();
+
+      try
+      {
+         if (destination == null || destination.trim().equals(""))
+         {
+            propsNotSet.add(new PropertyDescriptor("destination", 
HornetQActivationSpec.class));
+            errorMessages.add("Destination is mandatory.");
+         }
+
+         if (destinationType != null && 
!Topic.class.getName().equals(destinationType) && 
!Queue.class.getName().equals(destinationType))
+         {
+            propsNotSet.add(new PropertyDescriptor("destinationType", 
HornetQActivationSpec.class));
+            errorMessages.add("If set, the destinationType must be either 
'javax.jms.Topic' or 'javax.jms.Queue'.");
+         }
+
+         if ((destinationType == null || destinationType.length() == 0 || 
Topic.class.getName().equals(destinationType)) && isSubscriptionDurable() && 
(subscriptionName == null || subscriptionName.length() == 0))
+         {
+            propsNotSet.add(new PropertyDescriptor("subscriptionName", 
HornetQActivationSpec.class));
+            errorMessages.add("If subscription is durable then subscription 
name must be specified.");
+         }
+      }
+      catch (IntrospectionException e)
+      {
+         e.printStackTrace();
+      }
+
+      if (propsNotSet.size() > 0)
       {
-         throw new InvalidPropertyException("Destination is mandatory");
+         StringBuffer b = new StringBuffer();
+         b.append("Invalid settings:");
+         for (Iterator<String> iter = errorMessages.iterator(); 
iter.hasNext();)
+         {
+            b.append(" ");
+            b.append(iter.next());
+         }
+         InvalidPropertyException e = new 
InvalidPropertyException(b.toString());
+         final PropertyDescriptor[] descriptors = propsNotSet.toArray(new 
PropertyDescriptor[propsNotSet.size()]);
+         e.setInvalidPropertyDescriptors(descriptors);
+         throw e;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java 
b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java
index 82b811d..eeb5b06 100644
--- a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java
+++ b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java
@@ -28,10 +28,14 @@ import org.hornetq.api.core.client.ClientSession.QueueQuery;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.jms.client.HornetQDestination;
 import org.hornetq.jms.client.HornetQMessage;
 import org.hornetq.ra.HornetQRALogger;
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.ra.HornetQXAResourceWrapper;
+import org.hornetq.utils.FutureLatch;
 
 /**
  * The message handler
@@ -39,6 +43,7 @@ import org.hornetq.ra.HornetQRALogger;
  * @author <a href="[email protected]">Adrian Brock</a>
  * @author <a href="mailto:[email protected]";>Jesper Pedersen</a>
  * @author <a href="mailto:[email protected]";>Andy Taylor</a>
+ * @author <a href="mailto:[email protected]";>Martyn Taylor</a>
  */
 public class HornetQMessageHandler implements MessageHandler
 {
@@ -99,11 +104,9 @@ public class HornetQMessageHandler implements MessageHandler
       SimpleString selectorString = selector == null || 
selector.trim().equals("") ? null : new SimpleString(selector);
       if (activation.isTopic() && spec.isSubscriptionDurable())
       {
-         String subscriptionName = spec.getSubscriptionName();
-         String clientID = spec.getClientID();
-
-         SimpleString queueName = new 
SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, 
clientID,
-                                                                               
                             subscriptionName));
+         SimpleString queueName = new 
SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true,
+                                                                               
                             spec.getClientID(),
+                                                                               
                             spec.getSubscriptionName()));
 
          QueueQuery subResponse = session.queueQuery(queueName);
 
@@ -188,7 +191,10 @@ public class HornetQMessageHandler implements 
MessageHandler
       transacted = activation.isDeliveryTransacted();
       if (activation.isDeliveryTransacted() && 
!activation.getActivationSpec().isUseLocalTx())
       {
-         endpoint = endpointFactory.createEndpoint(session);
+         XAResource xaResource = new HornetQXAResourceWrapper(session,
+                                                     ((HornetQResourceAdapter) 
spec.getResourceAdapter()).getJndiName(),
+                                                     
((ClientSessionFactoryInternal) cf).getLiveNodeId());
+         endpoint = endpointFactory.createEndpoint(xaResource);
          useXA = true;
       }
       else
@@ -204,19 +210,20 @@ public class HornetQMessageHandler implements 
MessageHandler
       return useXA ? session : null;
    }
 
-   public void interruptConsumer()
+   public Thread interruptConsumer(FutureLatch future)
    {
       try
       {
          if (consumer != null)
          {
-            consumer.interruptHandlers();
+            return consumer.prepareForClose(future);
          }
       }
       catch (Throwable e)
       {
          HornetQRALogger.LOGGER.warn("Error interrupting handler on endpoint " 
+ endpoint + " handler=" + consumer);
       }
+      return null;
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java
----------------------------------------------------------------------
diff --git a/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java 
b/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java
index b51bd8b..aa6aff1 100644
--- a/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java
+++ b/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java
@@ -78,10 +78,10 @@ public class Embedded
       System.out.println("\nStarting Embedded");
       if (hornetqServer == null)
       {
-         Configuration configuration = new ConfigurationImpl();
-         configuration.setPersistenceEnabled(false);
-         configuration.setSecurityEnabled(false);
-         configuration.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+         Configuration configuration = new ConfigurationImpl()
+            .setPersistenceEnabled(false)
+            .setSecurityEnabled(false)
+            .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
 
          hornetqServer = HornetQServers.newHornetQServer(configuration);
          hornetqServer.start();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java
 
b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java
index dd1111a..bbf4ba6 100644
--- 
a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java
+++ 
b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java
@@ -46,10 +46,10 @@ public class PersistentPushQueueConsumerTest
 
    public static void startup() throws Exception
    {
-      Configuration configuration = new ConfigurationImpl();
-      configuration.setPersistenceEnabled(false);
-      configuration.setSecurityEnabled(false);
-      configuration.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration configuration = new ConfigurationImpl()
+         .setPersistenceEnabled(false)
+         .setSecurityEnabled(false)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
 
       hornetqServer = HornetQServers.newHornetQServer(configuration);
       hornetqServer.start();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java
 
b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java
index e21fdda..135286a 100644
--- 
a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java
+++ 
b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java
@@ -57,10 +57,10 @@ public class PersistentPushTopicConsumerTest
    @BeforeClass
    public static void setup() throws Exception
    {
-      Configuration configuration = new ConfigurationImpl();
-      configuration.setPersistenceEnabled(false);
-      configuration.setSecurityEnabled(false);
-      configuration.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration configuration = new ConfigurationImpl()
+         .setPersistenceEnabled(false)
+         .setSecurityEnabled(false)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
 
       server = HornetQServers.newHornetQServer(configuration);
       server.start();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java
----------------------------------------------------------------------
diff --git a/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java 
b/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java
index 21a5dae..a3eed8c 100644
--- a/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java
+++ b/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java
@@ -53,10 +53,10 @@ public class RawAckTest
    @BeforeClass
    public static void setup() throws Exception
    {
-      Configuration configuration = new ConfigurationImpl();
-      configuration.setPersistenceEnabled(false);
-      configuration.setSecurityEnabled(false);
-      configuration.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration configuration = new ConfigurationImpl()
+         .setPersistenceEnabled(false)
+         .setSecurityEnabled(false)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
 
       hornetqServer = HornetQServers.newHornetQServer(configuration);
       hornetqServer.start();

Reply via email to