http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java
new file mode 100644
index 0000000..4fcb0c4
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java
@@ -0,0 +1,408 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import java.util.Arrays;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+
+/**
+ * A wrapper for a message
+ *
+ * @author <a href="mailto:[email protected]";>Adrian Brock</a>
+ * @author <a href="mailto:[email protected]";>Jesper Pedersen</a>
+ */
+public class ActiveMQRAStreamMessage extends ActiveMQRAMessage implements 
StreamMessage
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * @param message the message
+    * @param session the session
+    */
+   public ActiveMQRAStreamMessage(final StreamMessage message, final 
ActiveMQRASession session)
+   {
+      super(message, session);
+
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + message + ", " + 
session + ")");
+      }
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean readBoolean() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readBoolean()");
+      }
+
+      return ((StreamMessage)message).readBoolean();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public byte readByte() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readByte()");
+      }
+
+      return ((StreamMessage)message).readByte();
+   }
+
+   /**
+    * Read
+    * @param value The value
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int readBytes(final byte[] value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readBytes(" + Arrays.toString(value) + 
")");
+      }
+
+      return ((StreamMessage)message).readBytes(value);
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public char readChar() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readChar()");
+      }
+
+      return ((StreamMessage)message).readChar();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public double readDouble() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readDouble()");
+      }
+
+      return ((StreamMessage)message).readDouble();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public float readFloat() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readFloat()");
+      }
+
+      return ((StreamMessage)message).readFloat();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public int readInt() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readInt()");
+      }
+
+      return ((StreamMessage)message).readInt();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public long readLong() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readLong()");
+      }
+
+      return ((StreamMessage)message).readLong();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Object readObject() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readObject()");
+      }
+
+      return ((StreamMessage)message).readObject();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public short readShort() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readShort()");
+      }
+
+      return ((StreamMessage)message).readShort();
+   }
+
+   /**
+    * Read
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String readString() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("readString()");
+      }
+
+      return ((StreamMessage)message).readString();
+   }
+
+   /**
+    * Reset
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void reset() throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("reset()");
+      }
+
+      ((StreamMessage)message).reset();
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeBoolean(final boolean value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeBoolean(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeBoolean(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeByte(final byte value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeByte(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeByte(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @param offset The offset
+    * @param length The length
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeBytes(final byte[] value, final int offset, final int 
length) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeBytes(" + value + ", " + offset + 
", " + length + ")");
+      }
+
+      ((StreamMessage)message).writeBytes(value, offset, length);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeBytes(final byte[] value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeBytes(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeBytes(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeChar(final char value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeChar(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeChar(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeDouble(final double value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeDouble(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeDouble(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeFloat(final float value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeFloat(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeFloat(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeInt(final int value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeInt(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeInt(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeLong(final long value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeLong(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeLong(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeObject(final Object value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeObject(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeObject(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeShort(final short value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeShort(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeShort(value);
+   }
+
+   /**
+    * Write
+    * @param value The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void writeString(final String value) throws JMSException
+   {
+      if (ActiveMQRAStreamMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("writeString(" + value + ")");
+      }
+
+      ((StreamMessage)message).writeString(value);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java
new file mode 100644
index 0000000..f326992
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+
+/**
+ * A wrapper for a message
+ *
+ * @author <a href="mailto:[email protected]";>Adrian Brock</a>
+ * @author <a href="mailto:[email protected]";>Jesper Pedersen</a>
+ */
+public class ActiveMQRATextMessage extends ActiveMQRAMessage implements 
TextMessage
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * @param message the message
+    * @param session the session
+    */
+   public ActiveMQRATextMessage(final TextMessage message, final 
ActiveMQRASession session)
+   {
+      super(message, session);
+
+      if (ActiveMQRATextMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + message + ", " + 
session + ")");
+      }
+   }
+
+   /**
+    * Get text
+    * @return The text
+    * @exception JMSException Thrown if an error occurs
+    */
+   public String getText() throws JMSException
+   {
+      if (ActiveMQRATextMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getText()");
+      }
+
+      return ((TextMessage)message).getText();
+   }
+
+   /**
+    * Set text
+    * @param string The text
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void setText(final String string) throws JMSException
+   {
+      if (ActiveMQRATextMessage.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setText(" + string + ")");
+      }
+
+      ((TextMessage)message).setText(string);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java
new file mode 100644
index 0000000..99062c8
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+
+/**
+ * ActiveMQQueueSender.
+ *
+ * @author <a href="[email protected]">Adrian Brock</a>
+ * @author <a href="[email protected]">Jesper Pedersen</a>
+ */
+public class ActiveMQRATopicPublisher extends ActiveMQRAMessageProducer 
implements TopicPublisher
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * @param producer the producer
+    * @param session the session
+    */
+   public ActiveMQRATopicPublisher(final TopicPublisher producer, final 
ActiveMQRASession session)
+   {
+      super(producer, session);
+
+      if (ActiveMQRATopicPublisher.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + producer + ", " + 
session + ")");
+      }
+   }
+
+   /**
+    * Get the topic
+    * @return The topic
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Topic getTopic() throws JMSException
+   {
+      if (ActiveMQRATopicPublisher.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getTopic()");
+      }
+
+      return ((TopicPublisher)producer).getTopic();
+   }
+
+   /**
+    * Publish message
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Message message, final int deliveryMode, final 
int priority, final long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this +
+                                              " message=" +
+                                              message +
+                                              " deliveryMode=" +
+                                              deliveryMode +
+                                              " priority=" +
+                                              priority +
+                                              " ttl=" +
+                                              timeToLive);
+         }
+
+         checkState();
+
+         ((TopicPublisher)producer).publish(message, deliveryMode, priority, 
timeToLive);
+
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + 
message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Publish message
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Message message) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this + " message=" + 
message);
+         }
+
+         checkState();
+
+         ((TopicPublisher)producer).publish(message);
+
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + 
message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Publish message
+    * @param destination The destination
+    * @param message The message
+    * @param deliveryMode The delivery mode
+    * @param priority The priority
+    * @param timeToLive The time to live
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Topic destination,
+                       final Message message,
+                       final int deliveryMode,
+                       final int priority,
+                       final long timeToLive) throws JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this +
+                                              " destination=" +
+                                              destination +
+                                              " message=" +
+                                              message +
+                                              " deliveryMode=" +
+                                              deliveryMode +
+                                              " priority=" +
+                                              priority +
+                                              " ttl=" +
+                                              timeToLive);
+         }
+
+         checkState();
+
+         ((TopicPublisher)producer).publish(destination, message, 
deliveryMode, priority, timeToLive);
+
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + 
message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+
+   /**
+    * Publish message
+    * @param destination The destination
+    * @param message The message
+    * @exception JMSException Thrown if an error occurs
+    */
+   public void publish(final Topic destination, final Message message) throws 
JMSException
+   {
+      session.lock();
+      try
+      {
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + 
destination + " message=" + message);
+         }
+
+         checkState();
+
+         ((TopicPublisher)producer).publish(destination, message);
+
+         if (ActiveMQRATopicPublisher.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + 
message);
+         }
+      }
+      finally
+      {
+         session.unlock();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java
new file mode 100644
index 0000000..25087a1
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+
+/**
+ * A wrapper for a topic subscriber
+ *
+ * @author <a href="mailto:[email protected]";>Adrian Brock</a>
+ * @author <a href="mailto:[email protected]";>Jesper Pedersen</a>
+ */
+public class ActiveMQRATopicSubscriber extends ActiveMQRAMessageConsumer 
implements TopicSubscriber
+{
+   /** Whether trace is enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Create a new wrapper
+    * @param consumer the topic subscriber
+    * @param session the session
+    */
+   public ActiveMQRATopicSubscriber(final TopicSubscriber consumer, final 
ActiveMQRASession session)
+   {
+      super(consumer, session);
+
+      if (ActiveMQRATopicSubscriber.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + consumer + ", " + 
session + ")");
+      }
+   }
+
+   /**
+    * Get the no local value
+    * @return The value
+    * @exception JMSException Thrown if an error occurs
+    */
+   public boolean getNoLocal() throws JMSException
+   {
+      if (ActiveMQRATopicSubscriber.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getNoLocal()");
+      }
+
+      checkState();
+      return ((TopicSubscriber)consumer).getNoLocal();
+   }
+
+   /**
+    * Get the topic
+    * @return The topic
+    * @exception JMSException Thrown if an error occurs
+    */
+   public Topic getTopic() throws JMSException
+   {
+      if (ActiveMQRATopicSubscriber.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getTopic()");
+      }
+
+      checkState();
+      return ((TopicSubscriber)consumer).getTopic();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java
new file mode 100644
index 0000000..75e4fda
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import org.apache.activemq.jms.client.ActiveMQConnectionForContext;
+import org.apache.activemq.jms.client.ThreadAwareContext;
+
+import javax.jms.XAJMSContext;
+
+public class ActiveMQRAXAJMSContext extends ActiveMQRAJMSContext implements 
XAJMSContext
+{
+   public ActiveMQRAXAJMSContext(ActiveMQConnectionForContext connection, 
ThreadAwareContext threadAwareContext)
+   {
+      super(connection, threadAwareContext);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java
new file mode 100644
index 0000000..cd61b20
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.core.client.impl.ActiveMQXAResource;
+
+/**
+ * ActiveMQXAResource.
+ *
+ * @author <a href="[email protected]">Adrian Brock</a>
+ * @author <a href="[email protected]">Jesper Pedersen</a>
+ */
+public class ActiveMQRAXAResource implements ActiveMQXAResource
+{
+   /** Trace enabled */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /** The managed connection */
+   private final ActiveMQRAManagedConnection managedConnection;
+
+   /** The resource */
+   private final XAResource xaResource;
+
+   /**
+    * Create a new ActiveMQXAResource.
+    * @param managedConnection the managed connection
+    * @param xaResource the xa resource
+    */
+   public ActiveMQRAXAResource(final ActiveMQRAManagedConnection 
managedConnection, final XAResource xaResource)
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + managedConnection + ", 
" + xaResource + ")");
+      }
+
+      this.managedConnection = managedConnection;
+      this.xaResource = xaResource;
+   }
+
+   /**
+    * Start
+    * @param xid A global transaction identifier
+    * @param flags One of TMNOFLAGS, TMJOIN, or TMRESUME
+    * @exception XAException An error has occurred
+    */
+   public void start(final Xid xid, final int flags) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("start(" + xid + ", " + flags + ")");
+      }
+
+      managedConnection.lock();
+
+      ClientSessionInternal sessionInternal = (ClientSessionInternal) 
xaResource;
+      try
+      {
+         //this resets any tx stuff, we assume here that the tm and jca layer 
are well behaved when it comes to this
+         sessionInternal.resetIfNeeded();
+      }
+      catch (ActiveMQException e)
+      {
+         ActiveMQRALogger.LOGGER.problemResettingXASession();
+      }
+      try
+      {
+         xaResource.start(xid, flags);
+      }
+      finally
+      {
+         managedConnection.setInManagedTx(true);
+         managedConnection.unlock();
+      }
+   }
+
+   /**
+    * End
+    * @param xid A global transaction identifier
+    * @param flags One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+    * @exception XAException An error has occurred
+    */
+   public void end(final Xid xid, final int flags) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("end(" + xid + ", " + flags + ")");
+      }
+
+      managedConnection.lock();
+      try
+      {
+         xaResource.end(xid, flags);
+      }
+      finally
+      {
+         managedConnection.setInManagedTx(false);
+         managedConnection.unlock();
+      }
+   }
+
+   /**
+    * Prepare
+    * @param xid A global transaction identifier
+    * @return XA_RDONLY or XA_OK
+    * @exception XAException An error has occurred
+    */
+   public int prepare(final Xid xid) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("prepare(" + xid + ")");
+      }
+
+      return xaResource.prepare(xid);
+   }
+
+   /**
+    * Commit
+    * @param xid A global transaction identifier
+    * @param onePhase If true, the resource manager should use a one-phase 
commit protocol to commit the work done on behalf of xid.
+    * @exception XAException An error has occurred
+    */
+   public void commit(final Xid xid, final boolean onePhase) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("commit(" + xid + ", " + onePhase + 
")");
+      }
+
+      xaResource.commit(xid, onePhase);
+   }
+
+   /**
+    * Rollback
+    * @param xid A global transaction identifier
+    * @exception XAException An error has occurred
+    */
+   public void rollback(final Xid xid) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("rollback(" + xid + ")");
+      }
+
+      xaResource.rollback(xid);
+   }
+
+   /**
+    * Forget
+    * @param xid A global transaction identifier
+    * @exception XAException An error has occurred
+    */
+   public void forget(final Xid xid) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("forget(" + xid + ")");
+      }
+
+      managedConnection.lock();
+      try
+      {
+         xaResource.forget(xid);
+      }
+      finally
+      {
+         managedConnection.setInManagedTx(true);
+         managedConnection.setInManagedTx(false);
+         managedConnection.unlock();
+      }
+   }
+
+   /**
+    * IsSameRM
+    * @param xaRes An XAResource object whose resource manager instance is to 
be compared with the resource manager instance of the target object.
+    * @return True if its the same RM instance; otherwise false.
+    * @exception XAException An error has occurred
+    */
+   public boolean isSameRM(final XAResource xaRes) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("isSameRM(" + xaRes + ")");
+      }
+
+      return xaResource.isSameRM(xaRes);
+   }
+
+   /**
+    * Recover
+    * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS
+    * @return Zero or more XIDs
+    * @exception XAException An error has occurred
+    */
+   public Xid[] recover(final int flag) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("recover(" + flag + ")");
+      }
+
+      return xaResource.recover(flag);
+   }
+
+   /**
+    * Get the transaction timeout in seconds
+    * @return The transaction timeout
+    * @exception XAException An error has occurred
+    */
+   public int getTransactionTimeout() throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getTransactionTimeout()");
+      }
+
+      return xaResource.getTransactionTimeout();
+   }
+
+   /**
+    * Set the transaction timeout
+    * @param seconds The number of seconds
+    * @return True if the transaction timeout value is set successfully; 
otherwise false.
+    * @exception XAException An error has occurred
+    */
+   public boolean setTransactionTimeout(final int seconds) throws XAException
+   {
+      if (ActiveMQRAXAResource.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setTransactionTimeout(" + seconds + 
")");
+      }
+
+      return xaResource.setTransactionTimeout(seconds);
+   }
+
+   @Override
+   public XAResource getResource()
+   {
+      return xaResource;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java 
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java
new file mode 100644
index 0000000..377e980
--- /dev/null
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra;
+
+import javax.naming.Context;
+import javax.transaction.TransactionManager;
+import java.lang.reflect.Method;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.jgroups.JChannel;
+
+/**
+ * Various utility functions
+ *
+ * @author <a href="mailto:[email protected]";>Adrian Brock</a>
+ * @author <a href="mailto:[email protected]";>Jesper Pedersen</a>
+ */
+public final class ActiveMQRaUtils
+{
+   /**
+    * Private constructor
+    */
+   private ActiveMQRaUtils()
+   {
+   }
+
+   /**
+    * Compare two strings.
+    *
+    * @param me  First value
+    * @param you Second value
+    * @return True if object equals else false.
+    */
+   public static boolean compare(final String me, final String you)
+   {
+      // If both null or intern equals
+      if (me == you)
+      {
+         return true;
+      }
+
+      // if me null and you are not
+      if (me == null)
+      {
+         return false;
+      }
+
+      // me will not be null, test for equality
+      return me.equals(you);
+   }
+
+   /**
+    * Compare two integers.
+    *
+    * @param me  First value
+    * @param you Second value
+    * @return True if object equals else false.
+    */
+   public static boolean compare(final Integer me, final Integer you)
+   {
+      // If both null or intern equals
+      if (me == you)
+      {
+         return true;
+      }
+
+      // if me null and you are not
+      if (me == null)
+      {
+         return false;
+      }
+
+      // me will not be null, test for equality
+      return me.equals(you);
+   }
+
+   /**
+    * Compare two longs.
+    *
+    * @param me  First value
+    * @param you Second value
+    * @return True if object equals else false.
+    */
+   public static boolean compare(final Long me, final Long you)
+   {
+      // If both null or intern equals
+      if (me == you)
+      {
+         return true;
+      }
+
+      // if me null and you are not
+      if (me == null)
+      {
+         return false;
+      }
+
+      // me will not be null, test for equality
+      return me.equals(you);
+   }
+
+   /**
+    * Compare two doubles.
+    *
+    * @param me  First value
+    * @param you Second value
+    * @return True if object equals else false.
+    */
+   public static boolean compare(final Double me, final Double you)
+   {
+      // If both null or intern equals
+      if (me == you)
+      {
+         return true;
+      }
+
+      // if me null and you are not
+      if (me == null)
+      {
+         return false;
+      }
+
+      // me will not be null, test for equality
+      return me.equals(you);
+   }
+
+   /**
+    * Compare two booleans.
+    *
+    * @param me  First value
+    * @param you Second value
+    * @return True if object equals else false.
+    */
+   public static boolean compare(final Boolean me, final Boolean you)
+   {
+      // If both null or intern equals
+      if (me == you)
+      {
+         return true;
+      }
+
+      // if me null and you are not
+      if (me == null)
+      {
+         return false;
+      }
+
+      // me will not be null, test for equality
+      return me.equals(you);
+   }
+
+   /**
+    * Lookup an object in the default initial context
+    *
+    * @param context The context to use
+    * @param name    the name to lookup
+    * @param clazz   the expected type
+    * @return the object
+    * @throws Exception for any error
+    */
+   public static Object lookup(final Context context, final String name, final 
Class<?> clazz) throws Exception
+   {
+      return context.lookup(name);
+   }
+
+   /**
+    * Used on parsing JNDI Configuration
+    *
+    * @param config
+    * @return hash-table with configuration option pairs
+    */
+   public static Hashtable<String, String> parseHashtableConfig(final String 
config)
+   {
+      Hashtable<String, String> hashtable = new Hashtable<String, String>();
+
+      String[] topElements = config.split(";");
+
+      for (String element : topElements)
+      {
+         String[] expression = element.split("=");
+
+         if (expression.length != 2)
+         {
+            throw new IllegalArgumentException("Invalid expression " + element 
+ " at " + config);
+         }
+
+         hashtable.put(expression[0].trim(), expression[1].trim());
+      }
+
+      return hashtable;
+   }
+
+   public static List<Map<String, Object>> parseConfig(final String config)
+   {
+      List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
+
+      String[] topElements = config.split(",");
+
+      for (String topElement : topElements)
+      {
+         HashMap<String, Object> map = new HashMap<String, Object>();
+         result.add(map);
+
+         String[] elements = topElement.split(";");
+
+         for (String element : elements)
+         {
+            String[] expression = element.split("=");
+
+            if (expression.length != 2)
+            {
+               throw new IllegalArgumentException("Invalid expression " + 
element + " at " + config);
+            }
+
+            map.put(expression[0].trim(), expression[1].trim());
+         }
+      }
+
+
+      return result;
+   }
+
+   public static List<String> parseConnectorConnectorConfig(String config)
+   {
+      List<String> res = new ArrayList<String>();
+
+      String[] elements = config.split(",");
+
+      for (String element : elements)
+      {
+         res.add(element.trim());
+      }
+
+      return res;
+   }
+
+
+   /**
+    * The Resource adapter can't depend on any provider's specific library. 
Because of that we use reflection to locate the
+    * transaction manager during startup.
+    * <p/>
+    * <p/>
+    * TODO: https://jira.jboss.org/browse/HORNETQ-417
+    * We should use a proper SPI instead of reflection
+    * We would need to define a proper SPI package for this.
+    */
+   public static TransactionManager locateTM(final String locatorClass, final 
String locatorMethod)
+   {
+      return AccessController.doPrivileged(new 
PrivilegedAction<TransactionManager>()
+      {
+         public TransactionManager run()
+         {
+            try
+            {
+               ClassLoader loader = 
Thread.currentThread().getContextClassLoader();
+               Class<?> aClass = loader.loadClass(locatorClass);
+               Object o = aClass.newInstance();
+               Method m = aClass.getMethod(locatorMethod);
+               return (TransactionManager) m.invoke(o);
+            }
+            catch (Throwable e)
+            {
+               ActiveMQRALogger.LOGGER.debug(e.getMessage(), e);
+               return null;
+            }
+         }
+      });
+   }
+
+   /**
+    * Within AS7 the RA is loaded by JCA. properties can only be passed in 
String form. However if
+    * RA is configured using jgroups stack, we need to pass a Channel object. 
As is impossible with
+    * JCA, we use this method to allow a JChannel object to be located.
+    */
+   public static JChannel locateJGroupsChannel(final String locatorClass, 
final String name)
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<JChannel>()
+      {
+         public JChannel run()
+         {
+            try
+            {
+               ClassLoader loader = 
Thread.currentThread().getContextClassLoader();
+               Class<?> aClass = loader.loadClass(locatorClass);
+               Object o = aClass.newInstance();
+               Method m = aClass.getMethod("locateChannel", new 
Class[]{String.class});
+               return (JChannel) m.invoke(o, name);
+            }
+            catch (Throwable e)
+            {
+               ActiveMQRALogger.LOGGER.debug(e.getMessage(), e);
+               return null;
+            }
+         }
+      });
+   }
+
+   /**
+    * This seems duplicate code all over the place, but for security reasons 
we can't let something like this to be open in a
+    * utility class, as it would be a door to load anything you like in a safe 
VM.
+    * For that reason any class trying to do a privileged block should do with 
the AccessController directly.
+    */
+   private static Object safeInitNewInstance(final String className)
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<Object>()
+      {
+         public Object run()
+         {
+            ClassLoader loader = getClass().getClassLoader();
+            try
+            {
+               Class<?> clazz = loader.loadClass(className);
+               return clazz.newInstance();
+            }
+            catch (Throwable t)
+            {
+               try
+               {
+                  loader = Thread.currentThread().getContextClassLoader();
+                  if (loader != null)
+                     return loader.loadClass(className).newInstance();
+               }
+               catch (RuntimeException e)
+               {
+                  throw e;
+               }
+               catch (Exception e)
+               {
+               }
+
+               throw new IllegalArgumentException("Could not find class " + 
className);
+            }
+         }
+      });
+   }
+
+
+}

Reply via email to