http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java new file mode 100644 index 0000000..4fcb0c4 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAStreamMessage.java @@ -0,0 +1,408 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import java.util.Arrays; + +import javax.jms.JMSException; +import javax.jms.StreamMessage; + + +/** + * A wrapper for a message + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAStreamMessage extends ActiveMQRAMessage implements StreamMessage +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Create a new wrapper + * @param message the message + * @param session the session + */ + public ActiveMQRAStreamMessage(final StreamMessage message, final ActiveMQRASession session) + { + super(message, session); + + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + message + ", " + session + ")"); + } + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public boolean readBoolean() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readBoolean()"); + } + + return ((StreamMessage)message).readBoolean(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public byte readByte() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readByte()"); + } + + return ((StreamMessage)message).readByte(); + } + + /** + * Read + * @param value The value + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public int readBytes(final byte[] value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readBytes(" + Arrays.toString(value) + ")"); + } + + return ((StreamMessage)message).readBytes(value); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public char readChar() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readChar()"); + } + + return ((StreamMessage)message).readChar(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public double readDouble() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readDouble()"); + } + + return ((StreamMessage)message).readDouble(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public float readFloat() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readFloat()"); + } + + return ((StreamMessage)message).readFloat(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public int readInt() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readInt()"); + } + + return ((StreamMessage)message).readInt(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public long readLong() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readLong()"); + } + + return ((StreamMessage)message).readLong(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public Object readObject() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readObject()"); + } + + return ((StreamMessage)message).readObject(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public short readShort() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readShort()"); + } + + return ((StreamMessage)message).readShort(); + } + + /** + * Read + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public String readString() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("readString()"); + } + + return ((StreamMessage)message).readString(); + } + + /** + * Reset + * @exception JMSException Thrown if an error occurs + */ + public void reset() throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("reset()"); + } + + ((StreamMessage)message).reset(); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeBoolean(final boolean value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeBoolean(" + value + ")"); + } + + ((StreamMessage)message).writeBoolean(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeByte(final byte value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeByte(" + value + ")"); + } + + ((StreamMessage)message).writeByte(value); + } + + /** + * Write + * @param value The value + * @param offset The offset + * @param length The length + * @exception JMSException Thrown if an error occurs + */ + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeBytes(" + value + ", " + offset + ", " + length + ")"); + } + + ((StreamMessage)message).writeBytes(value, offset, length); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeBytes(final byte[] value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeBytes(" + value + ")"); + } + + ((StreamMessage)message).writeBytes(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeChar(final char value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeChar(" + value + ")"); + } + + ((StreamMessage)message).writeChar(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeDouble(final double value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeDouble(" + value + ")"); + } + + ((StreamMessage)message).writeDouble(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeFloat(final float value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeFloat(" + value + ")"); + } + + ((StreamMessage)message).writeFloat(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeInt(final int value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeInt(" + value + ")"); + } + + ((StreamMessage)message).writeInt(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeLong(final long value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeLong(" + value + ")"); + } + + ((StreamMessage)message).writeLong(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeObject(final Object value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeObject(" + value + ")"); + } + + ((StreamMessage)message).writeObject(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeShort(final short value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeShort(" + value + ")"); + } + + ((StreamMessage)message).writeShort(value); + } + + /** + * Write + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void writeString(final String value) throws JMSException + { + if (ActiveMQRAStreamMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("writeString(" + value + ")"); + } + + ((StreamMessage)message).writeString(value); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java new file mode 100644 index 0000000..f326992 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATextMessage.java @@ -0,0 +1,74 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + + +/** + * A wrapper for a message + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRATextMessage extends ActiveMQRAMessage implements TextMessage +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Create a new wrapper + * @param message the message + * @param session the session + */ + public ActiveMQRATextMessage(final TextMessage message, final ActiveMQRASession session) + { + super(message, session); + + if (ActiveMQRATextMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + message + ", " + session + ")"); + } + } + + /** + * Get text + * @return The text + * @exception JMSException Thrown if an error occurs + */ + public String getText() throws JMSException + { + if (ActiveMQRATextMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("getText()"); + } + + return ((TextMessage)message).getText(); + } + + /** + * Set text + * @param string The text + * @exception JMSException Thrown if an error occurs + */ + public void setText(final String string) throws JMSException + { + if (ActiveMQRATextMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("setText(" + string + ")"); + } + + ((TextMessage)message).setText(string); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java new file mode 100644 index 0000000..99062c8 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicPublisher.java @@ -0,0 +1,211 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Topic; +import javax.jms.TopicPublisher; + + +/** + * ActiveMQQueueSender. + * + * @author <a href="[email protected]">Adrian Brock</a> + * @author <a href="[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRATopicPublisher extends ActiveMQRAMessageProducer implements TopicPublisher +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Create a new wrapper + * @param producer the producer + * @param session the session + */ + public ActiveMQRATopicPublisher(final TopicPublisher producer, final ActiveMQRASession session) + { + super(producer, session); + + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + producer + ", " + session + ")"); + } + } + + /** + * Get the topic + * @return The topic + * @exception JMSException Thrown if an error occurs + */ + public Topic getTopic() throws JMSException + { + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("getTopic()"); + } + + return ((TopicPublisher)producer).getTopic(); + } + + /** + * Publish message + * @param message The message + * @param deliveryMode The delivery mode + * @param priority The priority + * @param timeToLive The time to live + * @exception JMSException Thrown if an error occurs + */ + public void publish(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + + " message=" + + message + + " deliveryMode=" + + deliveryMode + + " priority=" + + priority + + " ttl=" + + timeToLive); + } + + checkState(); + + ((TopicPublisher)producer).publish(message, deliveryMode, priority, timeToLive); + + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Publish message + * @param message The message + * @exception JMSException Thrown if an error occurs + */ + public void publish(final Message message) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + " message=" + message); + } + + checkState(); + + ((TopicPublisher)producer).publish(message); + + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Publish message + * @param destination The destination + * @param message The message + * @param deliveryMode The delivery mode + * @param priority The priority + * @param timeToLive The time to live + * @exception JMSException Thrown if an error occurs + */ + public void publish(final Topic destination, + final Message message, + final int deliveryMode, + final int priority, + final long timeToLive) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + + " destination=" + + destination + + " message=" + + message + + " deliveryMode=" + + deliveryMode + + " priority=" + + priority + + " ttl=" + + timeToLive); + } + + checkState(); + + ((TopicPublisher)producer).publish(destination, message, deliveryMode, priority, timeToLive); + + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Publish message + * @param destination The destination + * @param message The message + * @exception JMSException Thrown if an error occurs + */ + public void publish(final Topic destination, final Message message) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message); + } + + checkState(); + + ((TopicPublisher)producer).publish(destination, message); + + if (ActiveMQRATopicPublisher.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java new file mode 100644 index 0000000..25087a1 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRATopicSubscriber.java @@ -0,0 +1,77 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + + +/** + * A wrapper for a topic subscriber + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRATopicSubscriber extends ActiveMQRAMessageConsumer implements TopicSubscriber +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Create a new wrapper + * @param consumer the topic subscriber + * @param session the session + */ + public ActiveMQRATopicSubscriber(final TopicSubscriber consumer, final ActiveMQRASession session) + { + super(consumer, session); + + if (ActiveMQRATopicSubscriber.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + consumer + ", " + session + ")"); + } + } + + /** + * Get the no local value + * @return The value + * @exception JMSException Thrown if an error occurs + */ + public boolean getNoLocal() throws JMSException + { + if (ActiveMQRATopicSubscriber.trace) + { + ActiveMQRALogger.LOGGER.trace("getNoLocal()"); + } + + checkState(); + return ((TopicSubscriber)consumer).getNoLocal(); + } + + /** + * Get the topic + * @return The topic + * @exception JMSException Thrown if an error occurs + */ + public Topic getTopic() throws JMSException + { + if (ActiveMQRATopicSubscriber.trace) + { + ActiveMQRALogger.LOGGER.trace("getTopic()"); + } + + checkState(); + return ((TopicSubscriber)consumer).getTopic(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java new file mode 100644 index 0000000..75e4fda --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAJMSContext.java @@ -0,0 +1,26 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import org.apache.activemq.jms.client.ActiveMQConnectionForContext; +import org.apache.activemq.jms.client.ThreadAwareContext; + +import javax.jms.XAJMSContext; + +public class ActiveMQRAXAJMSContext extends ActiveMQRAJMSContext implements XAJMSContext +{ + public ActiveMQRAXAJMSContext(ActiveMQConnectionForContext connection, ThreadAwareContext threadAwareContext) + { + super(connection, threadAwareContext); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java new file mode 100644 index 0000000..cd61b20 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAXAResource.java @@ -0,0 +1,257 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.core.client.impl.ClientSessionInternal; +import org.apache.activemq.core.client.impl.ActiveMQXAResource; + +/** + * ActiveMQXAResource. + * + * @author <a href="[email protected]">Adrian Brock</a> + * @author <a href="[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAXAResource implements ActiveMQXAResource +{ + /** Trace enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** The managed connection */ + private final ActiveMQRAManagedConnection managedConnection; + + /** The resource */ + private final XAResource xaResource; + + /** + * Create a new ActiveMQXAResource. + * @param managedConnection the managed connection + * @param xaResource the xa resource + */ + public ActiveMQRAXAResource(final ActiveMQRAManagedConnection managedConnection, final XAResource xaResource) + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + managedConnection + ", " + xaResource + ")"); + } + + this.managedConnection = managedConnection; + this.xaResource = xaResource; + } + + /** + * Start + * @param xid A global transaction identifier + * @param flags One of TMNOFLAGS, TMJOIN, or TMRESUME + * @exception XAException An error has occurred + */ + public void start(final Xid xid, final int flags) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("start(" + xid + ", " + flags + ")"); + } + + managedConnection.lock(); + + ClientSessionInternal sessionInternal = (ClientSessionInternal) xaResource; + try + { + //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this + sessionInternal.resetIfNeeded(); + } + catch (ActiveMQException e) + { + ActiveMQRALogger.LOGGER.problemResettingXASession(); + } + try + { + xaResource.start(xid, flags); + } + finally + { + managedConnection.setInManagedTx(true); + managedConnection.unlock(); + } + } + + /** + * End + * @param xid A global transaction identifier + * @param flags One of TMSUCCESS, TMFAIL, or TMSUSPEND. + * @exception XAException An error has occurred + */ + public void end(final Xid xid, final int flags) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("end(" + xid + ", " + flags + ")"); + } + + managedConnection.lock(); + try + { + xaResource.end(xid, flags); + } + finally + { + managedConnection.setInManagedTx(false); + managedConnection.unlock(); + } + } + + /** + * Prepare + * @param xid A global transaction identifier + * @return XA_RDONLY or XA_OK + * @exception XAException An error has occurred + */ + public int prepare(final Xid xid) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("prepare(" + xid + ")"); + } + + return xaResource.prepare(xid); + } + + /** + * Commit + * @param xid A global transaction identifier + * @param onePhase If true, the resource manager should use a one-phase commit protocol to commit the work done on behalf of xid. + * @exception XAException An error has occurred + */ + public void commit(final Xid xid, final boolean onePhase) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("commit(" + xid + ", " + onePhase + ")"); + } + + xaResource.commit(xid, onePhase); + } + + /** + * Rollback + * @param xid A global transaction identifier + * @exception XAException An error has occurred + */ + public void rollback(final Xid xid) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("rollback(" + xid + ")"); + } + + xaResource.rollback(xid); + } + + /** + * Forget + * @param xid A global transaction identifier + * @exception XAException An error has occurred + */ + public void forget(final Xid xid) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("forget(" + xid + ")"); + } + + managedConnection.lock(); + try + { + xaResource.forget(xid); + } + finally + { + managedConnection.setInManagedTx(true); + managedConnection.setInManagedTx(false); + managedConnection.unlock(); + } + } + + /** + * IsSameRM + * @param xaRes An XAResource object whose resource manager instance is to be compared with the resource manager instance of the target object. + * @return True if its the same RM instance; otherwise false. + * @exception XAException An error has occurred + */ + public boolean isSameRM(final XAResource xaRes) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("isSameRM(" + xaRes + ")"); + } + + return xaResource.isSameRM(xaRes); + } + + /** + * Recover + * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS + * @return Zero or more XIDs + * @exception XAException An error has occurred + */ + public Xid[] recover(final int flag) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("recover(" + flag + ")"); + } + + return xaResource.recover(flag); + } + + /** + * Get the transaction timeout in seconds + * @return The transaction timeout + * @exception XAException An error has occurred + */ + public int getTransactionTimeout() throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("getTransactionTimeout()"); + } + + return xaResource.getTransactionTimeout(); + } + + /** + * Set the transaction timeout + * @param seconds The number of seconds + * @return True if the transaction timeout value is set successfully; otherwise false. + * @exception XAException An error has occurred + */ + public boolean setTransactionTimeout(final int seconds) throws XAException + { + if (ActiveMQRAXAResource.trace) + { + ActiveMQRALogger.LOGGER.trace("setTransactionTimeout(" + seconds + ")"); + } + + return xaResource.setTransactionTimeout(seconds); + } + + @Override + public XAResource getResource() + { + return xaResource; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java new file mode 100644 index 0000000..377e980 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRaUtils.java @@ -0,0 +1,354 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.naming.Context; +import javax.transaction.TransactionManager; +import java.lang.reflect.Method; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.jgroups.JChannel; + +/** + * Various utility functions + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public final class ActiveMQRaUtils +{ + /** + * Private constructor + */ + private ActiveMQRaUtils() + { + } + + /** + * Compare two strings. + * + * @param me First value + * @param you Second value + * @return True if object equals else false. + */ + public static boolean compare(final String me, final String you) + { + // If both null or intern equals + if (me == you) + { + return true; + } + + // if me null and you are not + if (me == null) + { + return false; + } + + // me will not be null, test for equality + return me.equals(you); + } + + /** + * Compare two integers. + * + * @param me First value + * @param you Second value + * @return True if object equals else false. + */ + public static boolean compare(final Integer me, final Integer you) + { + // If both null or intern equals + if (me == you) + { + return true; + } + + // if me null and you are not + if (me == null) + { + return false; + } + + // me will not be null, test for equality + return me.equals(you); + } + + /** + * Compare two longs. + * + * @param me First value + * @param you Second value + * @return True if object equals else false. + */ + public static boolean compare(final Long me, final Long you) + { + // If both null or intern equals + if (me == you) + { + return true; + } + + // if me null and you are not + if (me == null) + { + return false; + } + + // me will not be null, test for equality + return me.equals(you); + } + + /** + * Compare two doubles. + * + * @param me First value + * @param you Second value + * @return True if object equals else false. + */ + public static boolean compare(final Double me, final Double you) + { + // If both null or intern equals + if (me == you) + { + return true; + } + + // if me null and you are not + if (me == null) + { + return false; + } + + // me will not be null, test for equality + return me.equals(you); + } + + /** + * Compare two booleans. + * + * @param me First value + * @param you Second value + * @return True if object equals else false. + */ + public static boolean compare(final Boolean me, final Boolean you) + { + // If both null or intern equals + if (me == you) + { + return true; + } + + // if me null and you are not + if (me == null) + { + return false; + } + + // me will not be null, test for equality + return me.equals(you); + } + + /** + * Lookup an object in the default initial context + * + * @param context The context to use + * @param name the name to lookup + * @param clazz the expected type + * @return the object + * @throws Exception for any error + */ + public static Object lookup(final Context context, final String name, final Class<?> clazz) throws Exception + { + return context.lookup(name); + } + + /** + * Used on parsing JNDI Configuration + * + * @param config + * @return hash-table with configuration option pairs + */ + public static Hashtable<String, String> parseHashtableConfig(final String config) + { + Hashtable<String, String> hashtable = new Hashtable<String, String>(); + + String[] topElements = config.split(";"); + + for (String element : topElements) + { + String[] expression = element.split("="); + + if (expression.length != 2) + { + throw new IllegalArgumentException("Invalid expression " + element + " at " + config); + } + + hashtable.put(expression[0].trim(), expression[1].trim()); + } + + return hashtable; + } + + public static List<Map<String, Object>> parseConfig(final String config) + { + List<Map<String, Object>> result = new ArrayList<Map<String, Object>>(); + + String[] topElements = config.split(","); + + for (String topElement : topElements) + { + HashMap<String, Object> map = new HashMap<String, Object>(); + result.add(map); + + String[] elements = topElement.split(";"); + + for (String element : elements) + { + String[] expression = element.split("="); + + if (expression.length != 2) + { + throw new IllegalArgumentException("Invalid expression " + element + " at " + config); + } + + map.put(expression[0].trim(), expression[1].trim()); + } + } + + + return result; + } + + public static List<String> parseConnectorConnectorConfig(String config) + { + List<String> res = new ArrayList<String>(); + + String[] elements = config.split(","); + + for (String element : elements) + { + res.add(element.trim()); + } + + return res; + } + + + /** + * The Resource adapter can't depend on any provider's specific library. Because of that we use reflection to locate the + * transaction manager during startup. + * <p/> + * <p/> + * TODO: https://jira.jboss.org/browse/HORNETQ-417 + * We should use a proper SPI instead of reflection + * We would need to define a proper SPI package for this. + */ + public static TransactionManager locateTM(final String locatorClass, final String locatorMethod) + { + return AccessController.doPrivileged(new PrivilegedAction<TransactionManager>() + { + public TransactionManager run() + { + try + { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + Class<?> aClass = loader.loadClass(locatorClass); + Object o = aClass.newInstance(); + Method m = aClass.getMethod(locatorMethod); + return (TransactionManager) m.invoke(o); + } + catch (Throwable e) + { + ActiveMQRALogger.LOGGER.debug(e.getMessage(), e); + return null; + } + } + }); + } + + /** + * Within AS7 the RA is loaded by JCA. properties can only be passed in String form. However if + * RA is configured using jgroups stack, we need to pass a Channel object. As is impossible with + * JCA, we use this method to allow a JChannel object to be located. + */ + public static JChannel locateJGroupsChannel(final String locatorClass, final String name) + { + return AccessController.doPrivileged(new PrivilegedAction<JChannel>() + { + public JChannel run() + { + try + { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + Class<?> aClass = loader.loadClass(locatorClass); + Object o = aClass.newInstance(); + Method m = aClass.getMethod("locateChannel", new Class[]{String.class}); + return (JChannel) m.invoke(o, name); + } + catch (Throwable e) + { + ActiveMQRALogger.LOGGER.debug(e.getMessage(), e); + return null; + } + } + }); + } + + /** + * This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a + * utility class, as it would be a door to load anything you like in a safe VM. + * For that reason any class trying to do a privileged block should do with the AccessController directly. + */ + private static Object safeInitNewInstance(final String className) + { + return AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + ClassLoader loader = getClass().getClassLoader(); + try + { + Class<?> clazz = loader.loadClass(className); + return clazz.newInstance(); + } + catch (Throwable t) + { + try + { + loader = Thread.currentThread().getContextClassLoader(); + if (loader != null) + return loader.loadClass(className).newInstance(); + } + catch (RuntimeException e) + { + throw e; + } + catch (Exception e) + { + } + + throw new IllegalArgumentException("Could not find class " + className); + } + } + }); + } + + +}
