http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java new file mode 100644 index 0000000..19c74b7 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageConsumer.java @@ -0,0 +1,334 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + + +/** + * A wrapper for a message consumer + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAMessageConsumer implements MessageConsumer +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** The wrapped message consumer */ + protected MessageConsumer consumer; + + /** The session for this consumer */ + protected ActiveMQRASession session; + + /** + * Create a new wrapper + * @param consumer the consumer + * @param session the session + */ + public ActiveMQRAMessageConsumer(final MessageConsumer consumer, final ActiveMQRASession session) + { + this.consumer = consumer; + this.session = session; + + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("new ActiveMQMessageConsumer " + this + + " consumer=" + + consumer + + " session=" + + session); + } + } + + /** + * Close + * @exception JMSException Thrown if an error occurs + */ + public void close() throws JMSException + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("close " + this); + } + try + { + closeConsumer(); + } + finally + { + session.removeConsumer(this); + } + } + + /** + * Check state + * @exception JMSException Thrown if an error occurs + */ + void checkState() throws JMSException + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("checkState()"); + } + session.checkState(); + } + + /** + * Get message listener + * @return The listener + * @exception JMSException Thrown if an error occurs + */ + public MessageListener getMessageListener() throws JMSException + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("getMessageListener()"); + } + + checkState(); + session.checkStrict(); + return consumer.getMessageListener(); + } + + /** + * Set message listener + * @param listener The listener + * @exception JMSException Thrown if an error occurs + */ + public void setMessageListener(final MessageListener listener) throws JMSException + { + session.lock(); + try + { + checkState(); + session.checkStrict(); + if (listener == null) + { + consumer.setMessageListener(null); + } + else + { + consumer.setMessageListener(wrapMessageListener(listener)); + } + } + finally + { + session.unlock(); + } + } + + /** + * Get message selector + * @return The selector + * @exception JMSException Thrown if an error occurs + */ + public String getMessageSelector() throws JMSException + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("getMessageSelector()"); + } + + checkState(); + return consumer.getMessageSelector(); + } + + /** + * Receive + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public Message receive() throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("receive " + this); + } + + checkState(); + Message message = consumer.receive(); + + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("received " + this + " result=" + message); + } + + if (message == null) + { + return null; + } + else + { + return wrapMessage(message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Receive + * @param timeout The timeout value + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public Message receive(final long timeout) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("receive " + this + " timeout=" + timeout); + } + + checkState(); + Message message = consumer.receive(timeout); + + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("received " + this + " result=" + message); + } + + if (message == null) + { + return null; + } + else + { + return wrapMessage(message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Receive + * @return The message + * @exception JMSException Thrown if an error occurs + */ + public Message receiveNoWait() throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("receiveNoWait " + this); + } + + checkState(); + Message message = consumer.receiveNoWait(); + + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("received " + this + " result=" + message); + } + + if (message == null) + { + return null; + } + else + { + return wrapMessage(message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Close consumer + * @exception JMSException Thrown if an error occurs + */ + void closeConsumer() throws JMSException + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("closeConsumer()"); + } + + consumer.close(); + } + + /** + * Wrap message + * @param message The message to be wrapped + * @return The wrapped message + */ + Message wrapMessage(final Message message) + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("wrapMessage(" + message + ")"); + } + + if (message instanceof BytesMessage) + { + return new ActiveMQRABytesMessage((BytesMessage)message, session); + } + else if (message instanceof MapMessage) + { + return new ActiveMQRAMapMessage((MapMessage)message, session); + } + else if (message instanceof ObjectMessage) + { + return new ActiveMQRAObjectMessage((ObjectMessage)message, session); + } + else if (message instanceof StreamMessage) + { + return new ActiveMQRAStreamMessage((StreamMessage)message, session); + } + else if (message instanceof TextMessage) + { + return new ActiveMQRATextMessage((TextMessage)message, session); + } + return new ActiveMQRAMessage(message, session); + } + + /** + * Wrap message listener + * @param listener The listener to be wrapped + * @return The wrapped listener + */ + MessageListener wrapMessageListener(final MessageListener listener) + { + if (ActiveMQRAMessageConsumer.trace) + { + ActiveMQRALogger.LOGGER.trace("getMessageSelector()"); + } + + return new ActiveMQRAMessageListener(listener, this); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java new file mode 100644 index 0000000..42f0a42 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageListener.java @@ -0,0 +1,65 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.Message; +import javax.jms.MessageListener; + + +/** + * A wrapper for a message listener + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAMessageListener implements MessageListener +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** The message listener */ + private final MessageListener listener; + + /** The consumer */ + private final ActiveMQRAMessageConsumer consumer; + + /** + * Create a new wrapper + * @param listener the listener + * @param consumer the consumer + */ + public ActiveMQRAMessageListener(final MessageListener listener, final ActiveMQRAMessageConsumer consumer) + { + if (ActiveMQRAMessageListener.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + listener + ", " + consumer + ")"); + } + + this.listener = listener; + this.consumer = consumer; + } + + /** + * On message + * @param message The message + */ + public void onMessage(Message message) + { + if (ActiveMQRAMessageListener.trace) + { + ActiveMQRALogger.LOGGER.trace("onMessage(" + message + ")"); + } + + message = consumer.wrapMessage(message); + listener.onMessage(message); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java new file mode 100644 index 0000000..4ba4271 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMessageProducer.java @@ -0,0 +1,473 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.CompletionListener; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; + + +/** + * ActiveMQMessageProducer. + * + * @author <a href="[email protected]">Adrian Brock</a> + * @author <a href="[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAMessageProducer implements MessageProducer +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** The wrapped message producer */ + protected MessageProducer producer; + + /** The session for this consumer */ + protected ActiveMQRASession session; + + /** + * Create a new wrapper + * @param producer the producer + * @param session the session + */ + public ActiveMQRAMessageProducer(final MessageProducer producer, final ActiveMQRASession session) + { + this.producer = producer; + this.session = session; + + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("new ActiveMQMessageProducer " + this + + " producer=" + + producer + + " session=" + + session); + } + } + + /** + * Close + * @exception JMSException Thrown if an error occurs + */ + public void close() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("close " + this); + } + try + { + closeProducer(); + } + finally + { + session.removeProducer(this); + } + } + + /** + * Send message + * @param destination The destination + * @param message The message + * @param deliveryMode The delivery mode + * @param priority The priority + * @param timeToLive The time to live + * @exception JMSException Thrown if an error occurs + */ + public void send(final Destination destination, + final Message message, + final int deliveryMode, + final int priority, + final long timeToLive) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + + " destination=" + + destination + + " message=" + + message + + " deliveryMode=" + + deliveryMode + + " priority=" + + priority + + " ttl=" + + timeToLive); + } + + checkState(); + + producer.send(destination, message, deliveryMode, priority, timeToLive); + + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Send message + * @param destination The destination + * @param message The message + * @exception JMSException Thrown if an error occurs + */ + public void send(final Destination destination, final Message message) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message); + } + + checkState(); + + producer.send(destination, message); + + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Send message + * @param message The message + * @param deliveryMode The delivery mode + * @param priority The priority + * @param timeToLive The time to live + * @exception JMSException Thrown if an error occurs + */ + public void send(final Message message, final int deliveryMode, final int priority, final long timeToLive) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + + " message=" + + message + + " deliveryMode=" + + deliveryMode + + " priority=" + + priority + + " ttl=" + + timeToLive); + } + + checkState(); + + producer.send(message, deliveryMode, priority, timeToLive); + + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Send message + * @param message The message + * @exception JMSException Thrown if an error occurs + */ + public void send(final Message message) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + " message=" + message); + } + + checkState(); + + producer.send(message); + + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Get the delivery mode + * @return The mode + * @exception JMSException Thrown if an error occurs + */ + public int getDeliveryMode() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("getDeliveryMode()"); + } + + return producer.getDeliveryMode(); + } + + /** + * Get the destination + * @return The destination + * @exception JMSException Thrown if an error occurs + */ + public Destination getDestination() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("getDestination()"); + } + + return producer.getDestination(); + } + + /** + * Disable message id + * @return True if disable + * @exception JMSException Thrown if an error occurs + */ + public boolean getDisableMessageID() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("getDisableMessageID()"); + } + + return producer.getDisableMessageID(); + } + + /** + * Disable message timestamp + * @return True if disable + * @exception JMSException Thrown if an error occurs + */ + public boolean getDisableMessageTimestamp() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("getDisableMessageTimestamp()"); + } + + return producer.getDisableMessageTimestamp(); + } + + /** + * Get the priority + * @return The priority + * @exception JMSException Thrown if an error occurs + */ + public int getPriority() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("getPriority()"); + } + + return producer.getPriority(); + } + + /** + * Get the time to live + * @return The ttl + * @exception JMSException Thrown if an error occurs + */ + public long getTimeToLive() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("getTimeToLive()"); + } + + return producer.getTimeToLive(); + } + + /** + * Set the delivery mode + * @param deliveryMode The mode + * @exception JMSException Thrown if an error occurs + */ + public void setDeliveryMode(final int deliveryMode) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("setDeliveryMode(" + deliveryMode + ")"); + } + + producer.setDeliveryMode(deliveryMode); + } + + /** + * Set disable message id + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void setDisableMessageID(final boolean value) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("setDisableMessageID(" + value + ")"); + } + + producer.setDisableMessageID(value); + } + + /** + * Set disable message timestamp + * @param value The value + * @exception JMSException Thrown if an error occurs + */ + public void setDisableMessageTimestamp(final boolean value) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("setDisableMessageTimestamp(" + value + ")"); + } + + producer.setDisableMessageTimestamp(value); + } + + /** + * Set the priority + * @param defaultPriority The value + * @exception JMSException Thrown if an error occurs + */ + public void setPriority(final int defaultPriority) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("setPriority(" + defaultPriority + ")"); + } + + producer.setPriority(defaultPriority); + } + + /** + * Set the ttl + * @param timeToLive The value + * @exception JMSException Thrown if an error occurs + */ + public void setTimeToLive(final long timeToLive) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("setTimeToLive(" + timeToLive + ")"); + } + + producer.setTimeToLive(timeToLive); + } + + @Override + public void setDeliveryDelay(long deliveryDelay) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("setDeliveryDelay(" + deliveryDelay + ")"); + } + producer.setDeliveryDelay(deliveryDelay); + } + + @Override + public long getDeliveryDelay() throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("getDeliveryDelay()"); + } + return producer.getDeliveryDelay(); + } + + @Override + public void send(Message message, CompletionListener completionListener) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send(" + message + ", " + completionListener + ")"); + } + producer.send(message, completionListener); + } + + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send(" + message + ", " + deliveryMode + ", " + priority + ", " + timeToLive + + ", " + completionListener + ")"); + } + producer.send(message, deliveryMode, priority, timeToLive, completionListener); + } + + @Override + public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send(" + destination + ", " + message + ", " + completionListener + ")"); + } + producer.send(destination, message, completionListener); + } + + @Override + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException + { + if (ActiveMQRAMessageProducer.trace) + { + ActiveMQRALogger.LOGGER.trace("send(" + destination + ", " + message + ", " + deliveryMode + ", " + priority + + ", " + timeToLive + ", " + completionListener + ")"); + } + producer.send(destination, message, deliveryMode, priority, timeToLive, completionListener); + } + + /** + * Check state + * @exception JMSException Thrown if an error occurs + */ + void checkState() throws JMSException + { + session.checkState(); + } + + /** + * Close producer + * @exception JMSException Thrown if an error occurs + */ + void closeProducer() throws JMSException + { + producer.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java new file mode 100644 index 0000000..d30c44d --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAMetaData.java @@ -0,0 +1,107 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.resource.ResourceException; +import javax.resource.spi.ManagedConnectionMetaData; + + +/** + * Managed connection meta data + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAMetaData implements ManagedConnectionMetaData +{ + /** Trace enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** The managed connection */ + private final ActiveMQRAManagedConnection mc; + + /** + * Constructor + * @param mc The managed connection + */ + public ActiveMQRAMetaData(final ActiveMQRAManagedConnection mc) + { + if (ActiveMQRAMetaData.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + mc + ")"); + } + + this.mc = mc; + } + + /** + * Get the EIS product name + * @return The name + * @exception ResourceException Thrown if operation fails + */ + public String getEISProductName() throws ResourceException + { + if (ActiveMQRAMetaData.trace) + { + ActiveMQRALogger.LOGGER.trace("getEISProductName()"); + } + + return "ActiveMQ"; + } + + /** + * Get the EIS product version + * @return The version + * @exception ResourceException Thrown if operation fails + */ + public String getEISProductVersion() throws ResourceException + { + if (ActiveMQRAMetaData.trace) + { + ActiveMQRALogger.LOGGER.trace("getEISProductVersion()"); + } + + return "2.0"; + } + + /** + * Get the user name + * @return The user name + * @exception ResourceException Thrown if operation fails + */ + public String getUserName() throws ResourceException + { + if (ActiveMQRAMetaData.trace) + { + ActiveMQRALogger.LOGGER.trace("getUserName()"); + } + + return mc.getUserName(); + } + + /** + * Get the maximum number of connections -- RETURNS 0 + * @return The number + * @exception ResourceException Thrown if operation fails + */ + public int getMaxConnections() throws ResourceException + { + if (ActiveMQRAMetaData.trace) + { + ActiveMQRALogger.LOGGER.trace("getMaxConnections()"); + } + + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java new file mode 100644 index 0000000..3fc843e --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAObjectMessage.java @@ -0,0 +1,76 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.ObjectMessage; + + +/** + * A wrapper for a message + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAObjectMessage extends ActiveMQRAMessage implements ObjectMessage +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Create a new wrapper + * @param message the message + * @param session the session + */ + public ActiveMQRAObjectMessage(final ObjectMessage message, final ActiveMQRASession session) + { + super(message, session); + + if (ActiveMQRAObjectMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + message + ", " + session + ")"); + } + } + + /** + * Get the object + * @return The object + * @exception JMSException Thrown if an error occurs + */ + public Serializable getObject() throws JMSException + { + if (ActiveMQRAObjectMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("getObject()"); + } + + return ((ObjectMessage)message).getObject(); + } + + /** + * Set the object + * @param object The object + * @exception JMSException Thrown if an error occurs + */ + public void setObject(final Serializable object) throws JMSException + { + if (ActiveMQRAObjectMessage.trace) + { + ActiveMQRALogger.LOGGER.trace("setObject(" + object + ")"); + } + + ((ObjectMessage)message).setObject(object); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java new file mode 100644 index 0000000..c796271 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAProperties.java @@ -0,0 +1,358 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import java.io.Serializable; +import java.util.Hashtable; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.utils.DefaultSensitiveStringCodec; +import org.apache.activemq.utils.PasswordMaskingUtil; +import org.apache.activemq.utils.SensitiveDataCodec; + + +/** + * The RA default properties - these are set in the ra.xml file + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class ActiveMQRAProperties extends ConnectionFactoryProperties implements Serializable +{ + /** + * Serial version UID + */ + static final long serialVersionUID = -2772367477755473248L; + /** + * Trace enabled + */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * The user name + */ + private String userName; + + /** + * The password + */ + private String password = null; + + /** + * Use Local TX instead of XA + */ + private Boolean localTx = false; + + + /** + * Class used to locate the Transaction Manager. + * Using JBoss5 as the default locator + */ + private String transactionManagerLocatorClass = "org.apache.activemq.integration.jboss.tm.JBoss5TransactionManagerLocator;org.apache.activemq.integration.jboss.tm.JBoss4TransactionManagerLocator"; + + /** + * Method used to locate the TM + */ + private String transactionManagerLocatorMethod = "getTm;getTM"; + + private static final int DEFAULT_SETUP_ATTEMPTS = -1; + + private static final long DEFAULT_SETUP_INTERVAL = 2 * 1000; + + private int setupAttempts = DEFAULT_SETUP_ATTEMPTS; + + private long setupInterval = DEFAULT_SETUP_INTERVAL; + + private Hashtable<?, ?> jndiParams; + + private boolean useJNDI; + + private boolean useMaskedPassword = false; + + private String passwordCodec; + + private boolean initialized = false; + + private transient SensitiveDataCodec<String> codecInstance; + + /** + * Class used to get a JChannel + */ + private String jgroupsChannelLocatorClass; + + /** + * Name used to locate a JChannel + */ + private String jgroupsChannelRefName; + + /** + * Constructor + */ + public ActiveMQRAProperties() + { + if (ActiveMQRAProperties.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor()"); + } + } + + /** + * Get the user name + * + * @return The value + */ + public String getUserName() + { + if (ActiveMQRAProperties.trace) + { + ActiveMQRALogger.LOGGER.trace("getUserName()"); + } + + return userName; + } + + /** + * Set the user name + * + * @param userName The value + */ + public void setUserName(final String userName) + { + if (ActiveMQRAProperties.trace) + { + ActiveMQRALogger.LOGGER.trace("setUserName(" + userName + ")"); + } + + this.userName = userName; + } + + /** + * Get the password + * + * @return The value + */ + public String getPassword() + { + if (ActiveMQRAProperties.trace) + { + ActiveMQRALogger.LOGGER.trace("getPassword()"); + } + + return password; + } + + /** + * Set the password + * Based on UseMaskedPassword property, the password can be + * plain text or encoded string. However we cannot decide + * which is the case at this moment, because we don't know + * when the UseMaskedPassword and PasswordCodec are loaded. So for the moment + * we just save the password. + * + * @param password The value + */ + public void setPassword(final String password) + { + if (ActiveMQRAProperties.trace) + { + ActiveMQRALogger.LOGGER.trace("setPassword(****)"); + } + + this.password = password; + } + + /** + * @return the useJNDI + */ + public boolean isUseJNDI() + { + return useJNDI; + } + + /** + * @param value the useJNDI to set + */ + public void setUseJNDI(final Boolean value) + { + useJNDI = value; + } + + /** + * @return return the jndi params to use + */ + public Hashtable<?, ?> getParsedJndiParams() + { + return jndiParams; + } + + + public void setParsedJndiParams(Hashtable<?, ?> params) + { + jndiParams = params; + } + + /** + * Get the use XA flag + * + * @return The value + */ + public Boolean getUseLocalTx() + { + if (ActiveMQRAProperties.trace) + { + ActiveMQRALogger.LOGGER.trace("getUseLocalTx()"); + } + + return localTx; + } + + /** + * Set the use XA flag + * + * @param localTx The value + */ + public void setUseLocalTx(final Boolean localTx) + { + if (ActiveMQRAProperties.trace) + { + ActiveMQRALogger.LOGGER.trace("setUseLocalTx(" + localTx + ")"); + } + + this.localTx = localTx; + } + + + public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass) + { + this.transactionManagerLocatorClass = transactionManagerLocatorClass; + } + + public String getTransactionManagerLocatorClass() + { + return transactionManagerLocatorClass; + } + + public String getTransactionManagerLocatorMethod() + { + return transactionManagerLocatorMethod; + } + + public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod) + { + this.transactionManagerLocatorMethod = transactionManagerLocatorMethod; + } + + public int getSetupAttempts() + { + return setupAttempts; + } + + public void setSetupAttempts(Integer setupAttempts) + { + this.setupAttempts = setupAttempts; + } + + public long getSetupInterval() + { + return setupInterval; + } + + public void setSetupInterval(Long setupInterval) + { + this.setupInterval = setupInterval; + } + + public boolean isUseMaskedPassword() + { + return useMaskedPassword; + } + + public void setUseMaskedPassword(boolean useMaskedPassword) + { + this.useMaskedPassword = useMaskedPassword; + } + + public String getPasswordCodec() + { + return passwordCodec; + } + + public void setPasswordCodec(String codecs) + { + passwordCodec = codecs; + } + + @Override + public String toString() + { + return "ActiveMQRAProperties[localTx=" + localTx + + ", userName=" + userName + ", password=****]"; + } + + public synchronized void init() throws ActiveMQException + { + if (initialized) + return; + + if (useMaskedPassword) + { + codecInstance = new DefaultSensitiveStringCodec(); + + if (passwordCodec != null) + { + codecInstance = PasswordMaskingUtil.getCodec(passwordCodec); + } + + try + { + if (password != null) + { + password = codecInstance.decode(password); + } + } + catch (Exception e) + { + throw ActiveMQRABundle.BUNDLE.errorDecodingPassword(e); + } + + } + initialized = true; + } + + public SensitiveDataCodec<String> getCodecInstance() + { + return codecInstance; + } + + public String getJgroupsChannelLocatorClass() + { + return jgroupsChannelLocatorClass; + } + + public void setJgroupsChannelLocatorClass(String jgroupsChannelLocatorClass) + { + this.jgroupsChannelLocatorClass = jgroupsChannelLocatorClass; + } + + public String getJgroupsChannelRefName() + { + return jgroupsChannelRefName; + } + + public void setJgroupsChannelRefName(String jgroupsChannelRefName) + { + this.jgroupsChannelRefName = jgroupsChannelRefName; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java new file mode 100644 index 0000000..43c023e --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueReceiver.java @@ -0,0 +1,61 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueReceiver; + + +/** + * A wrapper for a queue receiver + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAQueueReceiver extends ActiveMQRAMessageConsumer implements QueueReceiver +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Create a new wrapper + * @param consumer the queue receiver + * @param session the session + */ + public ActiveMQRAQueueReceiver(final QueueReceiver consumer, final ActiveMQRASession session) + { + super(consumer, session); + + if (ActiveMQRAQueueReceiver.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + consumer + ", " + session + ")"); + } + } + + /** + * Get queue + * @return The queue + * @exception JMSException Thrown if an error occurs + */ + public Queue getQueue() throws JMSException + { + if (ActiveMQRAQueueReceiver.trace) + { + ActiveMQRALogger.LOGGER.trace("getQueue()"); + } + + checkState(); + return ((QueueReceiver)consumer).getQueue(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java new file mode 100644 index 0000000..ea72d21 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAQueueSender.java @@ -0,0 +1,138 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueSender; + + +/** + * ActiveMQQueueSender. + * + * @author <a href="[email protected]">Adrian Brock</a> + * @author <a href="[email protected]">Jesper Pedersen</a> + */ +public class ActiveMQRAQueueSender extends ActiveMQRAMessageProducer implements QueueSender +{ + /** Whether trace is enabled */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Create a new wrapper + * @param producer the producer + * @param session the session + */ + public ActiveMQRAQueueSender(final QueueSender producer, final ActiveMQRASession session) + { + super(producer, session); + + if (ActiveMQRAQueueSender.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + producer + ", " + session + ")"); + } + } + + /** + * Get queue + * @return The queue + * @exception JMSException Thrown if an error occurs + */ + public Queue getQueue() throws JMSException + { + if (ActiveMQRAQueueSender.trace) + { + ActiveMQRALogger.LOGGER.trace("getQueue()"); + } + + return ((QueueSender)producer).getQueue(); + } + + /** + * Send message + * @param destination The destination + * @param message The message + * @param deliveryMode The delivery mode + * @param priority The priority + * @param timeToLive The time to live + * @exception JMSException Thrown if an error occurs + */ + public void send(final Queue destination, + final Message message, + final int deliveryMode, + final int priority, + final long timeToLive) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAQueueSender.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + + " destination=" + + destination + + " message=" + + message + + " deliveryMode=" + + deliveryMode + + " priority=" + + priority + + " ttl=" + + timeToLive); + } + + checkState(); + producer.send(destination, message, deliveryMode, priority, timeToLive); + + if (ActiveMQRAQueueSender.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } + + /** + * Send message + * @param destination The destination + * @param message The message + * @exception JMSException Thrown if an error occurs + */ + public void send(final Queue destination, final Message message) throws JMSException + { + session.lock(); + try + { + if (ActiveMQRAQueueSender.trace) + { + ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message); + } + + checkState(); + producer.send(destination, message); + + if (ActiveMQRAQueueSender.trace) + { + ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message); + } + } + finally + { + session.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java new file mode 100644 index 0000000..430e855 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRAService.java @@ -0,0 +1,82 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.ObjectInstance; +import javax.management.ObjectName; + + +/** + * A ActiveMQRAService ensures that ActiveMQ Resource Adapter will be stopped *before* the ActiveMQ server. + * https://jira.jboss.org/browse/HORNETQ-339 + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * + * + */ +public class ActiveMQRAService +{ + // Constants ----------------------------------------------------- + // Attributes ---------------------------------------------------- + + private final MBeanServer mBeanServer; + + private final String resourceAdapterObjectName; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public ActiveMQRAService(final MBeanServer mBeanServer, final String resourceAdapterObjectName) + { + this.mBeanServer = mBeanServer; + this.resourceAdapterObjectName = resourceAdapterObjectName; + } + + // Public -------------------------------------------------------- + + public void stop() + { + try + { + ObjectName objectName = new ObjectName(resourceAdapterObjectName); + Set<ObjectInstance> mbeanSet = mBeanServer.queryMBeans(objectName, null); + + for (ObjectInstance mbean : mbeanSet) + { + String stateString = (String)mBeanServer.getAttribute(mbean.getObjectName(), "StateString"); + + if ("Started".equalsIgnoreCase(stateString) || "Starting".equalsIgnoreCase(stateString)) + { + mBeanServer.invoke(mbean.getObjectName(), "stop", new Object[0], new String[0]); + } + } + } + catch (Exception e) + { + ActiveMQRALogger.LOGGER.errorStoppingRA(e); + } + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +}
