http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSMessageListenerWrapper.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSMessageListenerWrapper.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSMessageListenerWrapper.java new file mode 100644 index 0000000..5f339ae --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSMessageListenerWrapper.java @@ -0,0 +1,152 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.MessageHandler; +import org.apache.activemq6.api.jms.HornetQJMSConstants; + +/** + * + * A JMSMessageListenerWrapper + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class JMSMessageListenerWrapper implements MessageHandler +{ + private final HornetQConnection connection; + + private final HornetQSession session; + + private final MessageListener listener; + + private final ClientConsumer consumer; + + private final boolean transactedOrClientAck; + + private final boolean individualACK; + + protected JMSMessageListenerWrapper(final HornetQConnection connection, + final HornetQSession session, + final ClientConsumer consumer, + final MessageListener listener, + final int ackMode) + { + this.connection = connection; + + this.session = session; + + this.consumer = consumer; + + this.listener = listener; + + transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA(); + + individualACK = (ackMode == HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE); + } + + /** + * In this method we apply the JMS acknowledgement and redelivery semantics + * as per JMS spec + */ + public void onMessage(final ClientMessage message) + { + HornetQMessage msg = HornetQMessage.createMessage(message, session.getCoreSession()); + + if (individualACK) + { + msg.setIndividualAcknowledge(); + } + + try + { + msg.doBeforeReceive(); + } + catch (Exception e) + { + HornetQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(e); + + return; + } + + if (transactedOrClientAck) + { + try + { + message.acknowledge(); + } + catch (HornetQException e) + { + HornetQJMSClientLogger.LOGGER.errorProcessingMessage(e); + } + } + + try + { + connection.getThreadAwareContext().setCurrentThread(false); + listener.onMessage(msg); + } + catch (RuntimeException e) + { + // See JMS 1.1 spec, section 4.5.2 + + HornetQJMSClientLogger.LOGGER.onMessageError(e); + + if (!transactedOrClientAck) + { + try + { + if (individualACK) + { + message.individualAcknowledge(); + } + + session.getCoreSession().rollback(true); + + session.setRecoverCalled(true); + } + catch (Exception e2) + { + HornetQJMSClientLogger.LOGGER.errorRecoveringSession(e2); + } + } + } + finally + { + connection.getThreadAwareContext().clearCurrentThread(false); + } + if (!session.isRecoverCalled() && !individualACK) + { + try + { + // We don't want to call this if the consumer was closed from inside onMessage + if (!consumer.isClosed() && !transactedOrClientAck) + { + message.acknowledge(); + } + } + catch (HornetQException e) + { + HornetQJMSClientLogger.LOGGER.errorProcessingMessage(e); + } + } + + session.setRecoverCalled(false); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JmsExceptionUtils.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JmsExceptionUtils.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JmsExceptionUtils.java new file mode 100644 index 0000000..1f75bce --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JmsExceptionUtils.java @@ -0,0 +1,100 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * + */ +package org.apache.activemq6.jms.client; + +import javax.jms.IllegalStateRuntimeException; +import javax.jms.InvalidClientIDException; +import javax.jms.InvalidClientIDRuntimeException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidDestinationRuntimeException; +import javax.jms.InvalidSelectorException; +import javax.jms.InvalidSelectorRuntimeException; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.JMSSecurityException; +import javax.jms.JMSSecurityRuntimeException; +import javax.jms.MessageFormatException; +import javax.jms.MessageFormatRuntimeException; +import javax.jms.MessageNotWriteableException; +import javax.jms.MessageNotWriteableRuntimeException; +import javax.jms.ResourceAllocationException; +import javax.jms.ResourceAllocationRuntimeException; +import javax.jms.TransactionInProgressException; +import javax.jms.TransactionInProgressRuntimeException; +import javax.jms.TransactionRolledBackException; +import javax.jms.TransactionRolledBackRuntimeException; + +/** + * + */ +public final class JmsExceptionUtils +{ + private JmsExceptionUtils() + { + // utility class + } + + /** + * Converts instances of sub-classes of {@link JMSException} into the corresponding sub-class of + * {@link JMSRuntimeException}. + * @param e + * @return + */ + public static JMSRuntimeException convertToRuntimeException(JMSException e) + { + if (e instanceof javax.jms.IllegalStateException) + { + return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof InvalidClientIDException) + { + return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof InvalidDestinationException) + { + return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof InvalidSelectorException) + { + return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof JMSSecurityException) + { + return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof MessageFormatException) + { + return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof MessageNotWriteableException) + { + return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof ResourceAllocationException) + { + return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof TransactionInProgressException) + { + return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + if (e instanceof TransactionRolledBackException) + { + return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/SelectorTranslator.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/SelectorTranslator.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/SelectorTranslator.java new file mode 100644 index 0000000..34ef757 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/SelectorTranslator.java @@ -0,0 +1,149 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import java.util.ArrayList; +import java.util.List; + +/** + * + * This class converts a JMS selector expression into a HornetQ core filter expression. + * + * JMS selector and HornetQ filters use the same syntax but have different identifiers. + * + * We basically just need to replace the JMS header and property Identifier names + * with the corresponding HornetQ field and header Identifier names. + * + * We must be careful not to substitute any literals, or identifiers whose name contains the name + * of one we want to substitute. + * + * This makes it less trivial than a simple search and replace. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class SelectorTranslator +{ + public static String convertToHornetQFilterString(final String selectorString) + { + if (selectorString == null) + { + return null; + } + + // First convert any JMS header identifiers + + String filterString = SelectorTranslator.parse(selectorString, "JMSDeliveryMode", "HQDurable"); + filterString = SelectorTranslator.parse(filterString, "'PERSISTENT'", "'DURABLE'"); + filterString = SelectorTranslator.parse(filterString, "'NON_PERSISTENT'", "'NON_DURABLE'"); + filterString = SelectorTranslator.parse(filterString, "JMSPriority", "HQPriority"); + filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "HQTimestamp"); + filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "HQUserID"); + filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "HQExpiration"); + + return filterString; + + } + + private static String parse(final String input, final String match, final String replace) + { + final char quote = '\''; + + boolean inQuote = false; + + int matchPos = 0; + + List<Integer> positions = new ArrayList<Integer>(); + + boolean replaceInQuotes = match.charAt(0) == quote; + + for (int i = 0; i < input.length(); i++) + { + char c = input.charAt(i); + + if (c == quote) + { + inQuote = !inQuote; + } + + if ((!inQuote || replaceInQuotes) && c == match.charAt(matchPos)) + { + matchPos++; + + if (matchPos == match.length()) + { + + boolean matched = true; + + // Check that name is not part of another identifier name + + // Check character after match + if (i < input.length() - 1 && Character.isJavaIdentifierPart(input.charAt(i + 1))) + { + matched = false; + } + + // Check character before match + int posBeforeStart = i - match.length(); + + if (posBeforeStart >= 0 && Character.isJavaIdentifierPart(input.charAt(posBeforeStart))) + { + matched = false; + } + + if (matched) + { + positions.add(i - match.length() + 1); + } + + // check previous character too + + matchPos = 0; + } + } + else + { + matchPos = 0; + } + } + + if (!positions.isEmpty()) + { + StringBuffer buff = new StringBuffer(); + + int startPos = 0; + + for (int pos : positions) + { + String substr = input.substring(startPos, pos); + + buff.append(substr); + + buff.append(replace); + + startPos = pos + match.length(); + } + + if (startPos < input.length()) + { + buff.append(input.substring(startPos, input.length())); + } + + return buff.toString(); + } + else + { + return input; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/ThreadAwareContext.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/ThreadAwareContext.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/ThreadAwareContext.java new file mode 100644 index 0000000..a479685 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/ThreadAwareContext.java @@ -0,0 +1,152 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import org.apache.activemq6.utils.ConcurrentHashSet; + +import javax.jms.IllegalStateException; +import java.util.Set; + +/** + * Restricts what can be called on context passed in wrapped CompletionListener. + */ +public class ThreadAwareContext +{ + /** + * Necessary in order to assert some methods ({@link javax.jms.JMSContext#stop()} + * {@link javax.jms.JMSContext#close()} etc) are not getting called from within a + * {@link javax.jms.CompletionListener}. + * @see ThreadAwareContext#assertNotMessageListenerThread() + */ + private Thread completionListenerThread; + + /** + * Use a set because JMSContext can create more than one JMSConsumer + * to receive asynchronously from different destinations. + */ + private Set<Long> messageListenerThreads = new ConcurrentHashSet<Long>(); + + /** + * Sets current thread to the context + * <p> + * Meant to inform an JMSContext which is the thread that CANNOT call some of its methods. + * </p> + * @param isCompletionListener : indicating whether current thread is from CompletionListener + * or from MessageListener. + */ + public void setCurrentThread(boolean isCompletionListener) + { + if (isCompletionListener) + { + completionListenerThread = Thread.currentThread(); + } + else + { + messageListenerThreads.add(Thread.currentThread().getId()); + } + } + + + /** + * Clear current thread from the context + * + * @param isCompletionListener : indicating whether current thread is from CompletionListener + * or from MessageListener. + */ + public void clearCurrentThread(boolean isCompletionListener) + { + if (isCompletionListener) + { + completionListenerThread = null; + } + else + { + messageListenerThreads.remove(Thread.currentThread().getId()); + } + } + + /** + * Asserts a {@link javax.jms.CompletionListener} is not calling from its own {@link javax.jms.JMSContext}. + * <p> + * Note that the code must work without any need for further synchronization, as there is the + * requirement that only one CompletionListener be called at a time. In other words, + * CompletionListener calling is single-threaded. + * @see javax.jms.JMSContext#close() + * @see javax.jms.JMSContext#stop() + * @see javax.jms.JMSContext#commit() + * @see javax.jms.JMSContext#rollback() + */ + public void assertNotCompletionListenerThreadRuntime() + { + if (completionListenerThread == Thread.currentThread()) + { + throw HornetQJMSClientBundle.BUNDLE.callingMethodFromCompletionListenerRuntime(); + } + } + + /** + * Asserts a {@link javax.jms.CompletionListener} is not calling from its own {@link javax.jms.Connection} or from + * a {@link javax.jms.MessageProducer} . + * <p> + * Note that the code must work without any need for further synchronization, as there is the + * requirement that only one CompletionListener be called at a time. In other words, + * CompletionListener calling is single-threaded. + * + * @see javax.jms.Connection#close() + * @see javax.jms.MessageProducer#close() + */ + public void assertNotCompletionListenerThread() throws javax.jms.IllegalStateException + { + if (completionListenerThread == Thread.currentThread()) + { + throw HornetQJMSClientBundle.BUNDLE.callingMethodFromCompletionListener(); + } + } + + /** + * Asserts a {@link javax.jms.MessageListener} is not calling from its own {@link javax.jms.JMSContext}. + * <p> + * Note that the code must work without any need for further synchronization, as there is the + * requirement that only one MessageListener be called at a time. In other words, + * MessageListener calling is single-threaded. + * @see javax.jms.JMSContext#close() + * @see javax.jms.JMSContext#stop() + */ + public void assertNotMessageListenerThreadRuntime() + { + if (messageListenerThreads.contains(Thread.currentThread().getId())) + { + throw HornetQJMSClientBundle.BUNDLE.callingMethodFromListenerRuntime(); + } + } + + /** + * Asserts a {@link javax.jms.MessageListener} is not calling from its own {@link javax.jms.Connection} or + * {@link javax.jms.MessageConsumer}. + * <p> + * Note that the code must work without any need for further synchronization, as there is the + * requirement that only one MessageListener be called at a time. In other words, + * MessageListener calling is single-threaded. + * + * @see javax.jms.Connection#close() + * @see javax.jms.MessageConsumer#close() + */ + public void assertNotMessageListenerThread() throws IllegalStateException + { + if (messageListenerThreads.contains(Thread.currentThread().getId())) + { + throw HornetQJMSClientBundle.BUNDLE.callingMethodFromListener(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/package-info.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/package-info.java new file mode 100644 index 0000000..69314b5 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * Implementation of the JMS API. + * <br> + * Classes in this package are not meant to be used directly + * except {@link org.apache.activemq6.jms.client.HornetQConnectionFactory} which can be instantiated directly + * if JMS resources are not looked up in JNDI. + * + */ +package org.apache.activemq6.jms.client; + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/ConnectionFactoryObjectFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/ConnectionFactoryObjectFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/ConnectionFactoryObjectFactory.java new file mode 100644 index 0000000..ee1d271 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/ConnectionFactoryObjectFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.referenceable; + +import java.util.Hashtable; + +import javax.naming.Context; +import javax.naming.Name; +import javax.naming.Reference; +import javax.naming.spi.ObjectFactory; + +/** + * + * A ConnectionFactoryObjectFactory. + * + * Given a reference - reconstructs a HornetQRAConnectionFactory + * + * @author <a href="[email protected]">Tim Fox</a> + * @version $Revision$ + * + */ +public class ConnectionFactoryObjectFactory implements ObjectFactory +{ + public Object getObjectInstance(final Object ref, final Name name, final Context ctx, final Hashtable props) throws Exception + { + Reference r = (Reference)ref; + + byte[] bytes = (byte[])r.get("HornetQ-CF").getContent(); + + // Deserialize + return SerializableObjectRefAddr.deserialize(bytes); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/DestinationObjectFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/DestinationObjectFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/DestinationObjectFactory.java new file mode 100644 index 0000000..e27e50e --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/DestinationObjectFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.referenceable; + +import java.util.Hashtable; + +import javax.naming.Context; +import javax.naming.Name; +import javax.naming.Reference; +import javax.naming.spi.ObjectFactory; + +/** + * + * A DestinationObjectFactory. + * + * Given a Reference - reconstructs a HornetQDestination + * + * @author <a href="[email protected]">Tim Fox</a> + * @version $Revision$ + * + */ +public class DestinationObjectFactory implements ObjectFactory +{ + public Object getObjectInstance(final Object ref, final Name name, final Context ctx, final Hashtable props) throws Exception + { + Reference r = (Reference)ref; + + byte[] bytes = (byte[])r.get("HornetQ-DEST").getContent(); + + // Deserialize + return SerializableObjectRefAddr.deserialize(bytes); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/SerializableObjectRefAddr.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/SerializableObjectRefAddr.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/SerializableObjectRefAddr.java new file mode 100644 index 0000000..55a442e --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/referenceable/SerializableObjectRefAddr.java @@ -0,0 +1,79 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.referenceable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import javax.naming.NamingException; +import javax.naming.RefAddr; + +/** + * + * A SerializableObjectRefAddr. + * + * A RefAddr that can be used for any serializable object. + * + * Basically the address is the serialized form of the object as a byte[] + * + * @author <a href="[email protected]">Tim Fox</a> + * @version $Revision$ + * + */ +public class SerializableObjectRefAddr extends RefAddr +{ + private static final long serialVersionUID = 9158134548376171898L; + + private final byte[] bytes; + + public SerializableObjectRefAddr(final String type, final Object content) throws NamingException + { + super(type); + + try + { + // Serialize the object + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + ObjectOutputStream oos = new ObjectOutputStream(bos); + + oos.writeObject(content); + + oos.flush(); + + bytes = bos.toByteArray(); + } + catch (IOException e) + { + throw new NamingException("Failed to serialize object:" + content + ", " + e.getMessage()); + } + } + + @Override + public Object getContent() + { + return bytes; + } + + public static Object deserialize(final byte[] bytes) throws IOException, ClassNotFoundException + { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + + ObjectInputStream ois = new ObjectInputStream(bis); + + return ois.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/pom.xml ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/pom.xml b/activemq6-jms-server/pom.xml new file mode 100644 index 0000000..63be954 --- /dev/null +++ b/activemq6-jms-server/pom.xml @@ -0,0 +1,99 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-pom</artifactId> + <version>6.0.0-SNAPSHOT</version> + </parent> + + <artifactId>activemq6-jms-server</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ6 JMS Server</name> + + <properties> + <hornetq.basedir>${project.basedir}/..</hornetq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-core-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-jms-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jboss.spec.javax.jms</groupId> + <artifactId>jboss-jms-api_2.0_spec</artifactId> + </dependency> + <dependency> + <groupId>org.jboss.spec.javax.transaction</groupId> + <artifactId>jboss-transaction-api_1.2_spec</artifactId> + </dependency> + <dependency> + <groupId>org.jboss.jbossts.jts</groupId> + <artifactId>jbossjts-jacorb</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.jboss</groupId> + <artifactId>jboss-transaction-spi</artifactId> + </dependency> + <dependency> + <groupId>org.jboss.naming</groupId> + <artifactId>jnpserver</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>release</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9</version> + <configuration> + <doclet>org.jboss.apiviz.APIviz</doclet> + <docletArtifact> + <groupId>org.jboss.apiviz</groupId> + <artifactId>apiviz</artifactId> + <version>1.3.2.GA</version> + </docletArtifact> + <useStandardDocletOptions>true</useStandardDocletOptions> + <minmemory>128m</minmemory> + <maxmemory>512m</maxmemory> + <quiet>false</quiet> + <aggregate>true</aggregate> + <excludePackageNames>org.hornetq.core:org.hornetq.utils</excludePackageNames> + </configuration> + <executions> + <execution> + <id>javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/ConnectionFactoryFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/ConnectionFactoryFactory.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/ConnectionFactoryFactory.java new file mode 100644 index 0000000..158b114 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/ConnectionFactoryFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.bridge; + +/** + * A ConnectionFactoryFactory + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + * + */ +public interface ConnectionFactoryFactory +{ + Object createConnectionFactory() throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/DestinationFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/DestinationFactory.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/DestinationFactory.java new file mode 100644 index 0000000..2379316 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/DestinationFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.bridge; + +import javax.jms.Destination; + +/** + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @version <tt>$Revision: $</tt>10 Oct 2007 + * + * + */ +public interface DestinationFactory +{ + Destination createDestination() throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/HornetQJMSBridgeLogger.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/HornetQJMSBridgeLogger.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/HornetQJMSBridgeLogger.java new file mode 100644 index 0000000..edae392 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/HornetQJMSBridgeLogger.java @@ -0,0 +1,101 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.jms.bridge; + +import javax.management.ObjectName; + +import org.jboss.logging.BasicLogger; +import org.jboss.logging.Logger; +import org.jboss.logging.annotations.Cause; +import org.jboss.logging.annotations.LogMessage; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageLogger; + +/** + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + * + * Logger Code 34 + * + * each message id must be 6 digits long starting with 12, the 3rd digit donates the level so + * + * INF0 1 + * WARN 2 + * DEBUG 3 + * ERROR 4 + * TRACE 5 + * FATAL 6 + * + * so an INFO message would be 341000 to 341999 + */ +@MessageLogger(projectCode = "HQ") +public interface HornetQJMSBridgeLogger extends BasicLogger +{ + /** + * The default logger. + */ + HornetQJMSBridgeLogger LOGGER = Logger.getMessageLogger(HornetQJMSBridgeLogger.class, HornetQJMSBridgeLogger.class.getPackage().getName()); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 341000, value = "Failed to set up JMS bridge connections. Most probably the source or target servers are unavailable." + + " Will retry after a pause of {0} ms", format = Message.Format.MESSAGE_FORMAT) + void failedToSetUpBridge(long failureRetryInterval); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 341001, value = "JMS Bridge Succeeded in reconnecting to servers" , format = Message.Format.MESSAGE_FORMAT) + void bridgeReconnected(); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 341002, value = "Succeeded in connecting to servers" , format = Message.Format.MESSAGE_FORMAT) + void bridgeConnected(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342000, value = "Attempt to start JMS Bridge, but is already started" , format = Message.Format.MESSAGE_FORMAT) + void errorBridgeAlreadyStarted(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342001, value = "Failed to start JMS Bridge" , format = Message.Format.MESSAGE_FORMAT) + void errorStartingBridge(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342002, value = "Failed to unregisted JMS Bridge {0}" , format = Message.Format.MESSAGE_FORMAT) + void errorUnregisteringBridge(ObjectName objectName); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342003, value = "JMS Bridge unable to set up connections, bridge will be stopped" , format = Message.Format.MESSAGE_FORMAT) + void errorConnectingBridge(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342004, value = "JMS Bridge Will retry after a pause of {0} ms" , format = Message.Format.MESSAGE_FORMAT) + void bridgeRetry(long failureRetryInterval); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342005, value = "JMS Bridge unable to set up connections, bridge will not be started" , format = Message.Format.MESSAGE_FORMAT) + void bridgeNotStarted(); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342006, value = "Detected failure on bridge connection" , format = Message.Format.MESSAGE_FORMAT) + void bridgeFailure(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342009, value = "JMS Bridge failed to send + acknowledge batch, closing JMS objects" , format = Message.Format.MESSAGE_FORMAT) + void bridgeAckError(@Cause Exception e); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 342010, value = "Failed to connect JMS Bridge", format = Message.Format.MESSAGE_FORMAT) + void bridgeConnectError(@Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 344001, value = "Failed to start source connection" , format = Message.Format.MESSAGE_FORMAT) + void jmsBridgeSrcConnectError(@Cause Exception e); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridge.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridge.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridge.java new file mode 100644 index 0000000..ca86213 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridge.java @@ -0,0 +1,115 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.bridge; + +import javax.transaction.TransactionManager; + +import org.apache.activemq6.core.server.HornetQComponent; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public interface JMSBridge extends HornetQComponent +{ + void pause() throws Exception; + + void resume() throws Exception; + + DestinationFactory getSourceDestinationFactory(); + + void setSourceDestinationFactory(DestinationFactory dest); + + DestinationFactory getTargetDestinationFactory(); + + void setTargetDestinationFactory(DestinationFactory dest); + + String getSourceUsername(); + + void setSourceUsername(String name); + + String getSourcePassword(); + + void setSourcePassword(String pwd); + + String getTargetUsername(); + + void setTargetUsername(String name); + + String getTargetPassword(); + + void setTargetPassword(String pwd); + + String getSelector(); + + void setSelector(String selector); + + long getFailureRetryInterval(); + + void setFailureRetryInterval(long interval); + + int getMaxRetries(); + + void setMaxRetries(int retries); + + QualityOfServiceMode getQualityOfServiceMode(); + + void setQualityOfServiceMode(QualityOfServiceMode mode); + + int getMaxBatchSize(); + + void setMaxBatchSize(int size); + + long getMaxBatchTime(); + + void setMaxBatchTime(long time); + + String getSubscriptionName(); + + void setSubscriptionName(String subname); + + String getClientID(); + + void setClientID(String clientID); + + String getTransactionManagerLocatorClass(); + + void setTransactionManagerLocatorClass(String transactionManagerLocatorClass); + + String getTransactionManagerLocatorMethod(); + + void setTransactionManagerLocatorMethod(String transactionManagerLocatorMethod); + + boolean isAddMessageIDInHeader(); + + void setAddMessageIDInHeader(boolean value); + + boolean isPaused(); + + boolean isFailed(); + + void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff); + + void setTargetConnectionFactoryFactory(ConnectionFactoryFactory cff); + + void setTransactionManager(TransactionManager tm); + + boolean isUseMaskedPassword(); + + void setUseMaskedPassword(boolean maskPassword); + + String getPasswordCodec(); + + void setPasswordCodec(String codec); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridgeControl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridgeControl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridgeControl.java new file mode 100644 index 0000000..207dde2 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/JMSBridgeControl.java @@ -0,0 +1,93 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.bridge; + +import org.apache.activemq6.api.core.management.HornetQComponentControl; + +/** + * A JMSBridgeControl + * + * @author <a href="[email protected]">Jose de Castro</a> + * + */ +public interface JMSBridgeControl extends HornetQComponentControl +{ + void pause() throws Exception; + + void resume() throws Exception; + + String getSourceUsername(); + + void setSourceUsername(String name); + + String getSourcePassword(); + + void setSourcePassword(String pwd); + + String getTargetUsername(); + + void setTargetUsername(String name); + + String getTargetPassword(); + + void setTargetPassword(String pwd); + + String getSelector(); + + void setSelector(String selector); + + long getFailureRetryInterval(); + + void setFailureRetryInterval(long interval); + + int getMaxRetries(); + + void setMaxRetries(int retries); + + String getQualityOfServiceMode(); + + void setQualityOfServiceMode(String mode); + + int getMaxBatchSize(); + + void setMaxBatchSize(int size); + + long getMaxBatchTime(); + + void setMaxBatchTime(long time); + + String getSubscriptionName(); + + void setSubscriptionName(String subname); + + String getClientID(); + + void setClientID(String clientID); + + String getTransactionManagerLocatorClass(); + + void setTransactionManagerLocatorClass(String transactionManagerLocatorClass); + + String getTransactionManagerLocatorMethod(); + + void setTransactionManagerLocatorMethod(String transactionManagerLocatorMethod); + + boolean isAddMessageIDInHeader(); + + void setAddMessageIDInHeader(boolean value); + + boolean isPaused(); + + boolean isFailed(); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/QualityOfServiceMode.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/QualityOfServiceMode.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/QualityOfServiceMode.java new file mode 100644 index 0000000..fc02211 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/QualityOfServiceMode.java @@ -0,0 +1,100 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.bridge; + +/** + * <h3>Quality of server (QoS) levels</h3> + * + * <h4>QOS_AT_MOST_ONCE</h4> + * + * With this QoS mode messages will reach the destination from the source at + * most once. The messages are consumed from the source and acknowledged before + * sending to the destination. Therefore there is a possibility that if failure + * occurs between removing them from the source and them arriving at the + * destination they could be lost. Hence delivery will occur at most once. This + * mode is available for both persistent and non persistent messages. + * + * <h4>QOS_DUPLICATES_OK</h4> + * + * With this QoS mode, the messages are consumed from the source and then + * acknowledged after they have been successfully sent to the destination. + * Therefore there is a possibility that if failure occurs after sending to the + * destination but before acknowledging them, they could be sent again when the + * system recovers. I.e. the destination might receive duplicates after a + * failure. This mode is available for both persistent and non persistent + * messages. + * + * <h4>QOS_ONCE_AND_ONLY_ONCE</h4> + * + * This QoS mode ensures messages will reach the destination from the source + * once and only once. (Sometimes this mode is known as "exactly once"). If both + * the source and the destination are on the same HornetQ server + * instance then this can be achieved by sending and acknowledging the messages + * in the same local transaction. If the source and destination are on different + * servers this is achieved by enlisting the sending and consuming sessions in a + * JTA transaction. The JTA transaction is controlled by JBoss Transactions JTA + * implementation which is a fully recovering transaction manager, thus + * providing a very high degree of durability. If JTA is required then both + * supplied connection factories need to be XAConnectionFactory implementations. + * This mode is only available for persistent messages. This is likely to be the + * slowest mode since it requires extra persistence for the transaction logging. + * + * Note: For a specific application it may possible to provide once and only + * once semantics without using the QOS_ONCE_AND_ONLY_ONCE QoS level. This can + * be done by using the QOS_DUPLICATES_OK mode and then checking for duplicates + * at the destination and discarding them. Some JMS servers provide automatic + * duplicate message detection functionality, or this may be possible to + * implement on the application level by maintaining a cache of received message + * ids on disk and comparing received messages to them. The cache would only be + * valid for a certain period of time so this approach is not as watertight as + * using QOS_ONCE_AND_ONLY_ONCE but may be a good choice depending on your + * specific application. + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * + * + */ +public enum QualityOfServiceMode +{ + AT_MOST_ONCE(0), DUPLICATES_OK(1), ONCE_AND_ONLY_ONCE(2); + + private final int value; + + QualityOfServiceMode(final int value) + { + this.value = value; + } + + public int intValue() + { + return value; + } + + public static QualityOfServiceMode valueOf(final int value) + { + if (value == AT_MOST_ONCE.value) + { + return AT_MOST_ONCE; + } + if (value == DUPLICATES_OK.value) + { + return DUPLICATES_OK; + } + if (value == ONCE_AND_ONLY_ONCE.value) + { + return ONCE_AND_ONLY_ONCE; + } + throw new IllegalArgumentException("invalid QualityOfServiceMode value: " + value); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/impl/JMSBridgeControlImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/impl/JMSBridgeControlImpl.java b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/impl/JMSBridgeControlImpl.java new file mode 100644 index 0000000..281dcd4 --- /dev/null +++ b/activemq6-jms-server/src/main/java/org/apache/activemq6/jms/bridge/impl/JMSBridgeControlImpl.java @@ -0,0 +1,256 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.bridge.impl; + +import javax.management.StandardMBean; + +import org.apache.activemq6.jms.bridge.JMSBridge; +import org.apache.activemq6.jms.bridge.JMSBridgeControl; +import org.apache.activemq6.jms.bridge.QualityOfServiceMode; + +/** + * A JMSBridgeControlImpl + * + * @author <a href="[email protected]">Jose de Castro</a> + * + */ +public class JMSBridgeControlImpl extends StandardMBean implements JMSBridgeControl +{ + + private final JMSBridge bridge; + + // Constructors -------------------------------------------------- + + public JMSBridgeControlImpl(final JMSBridge bridge) throws Exception + { + super(JMSBridgeControl.class); + this.bridge = bridge; + } + + // Public -------------------------------------------------------- + + public void pause() throws Exception + { + bridge.pause(); + } + + public void resume() throws Exception + { + bridge.resume(); + } + + public boolean isStarted() + { + return bridge.isStarted(); + } + + public void start() throws Exception + { + bridge.start(); + } + + public void stop() throws Exception + { + bridge.stop(); + } + + public String getClientID() + { + return bridge.getClientID(); + } + + public long getFailureRetryInterval() + { + return bridge.getFailureRetryInterval(); + } + + public int getMaxBatchSize() + { + return bridge.getMaxBatchSize(); + } + + public long getMaxBatchTime() + { + return bridge.getMaxBatchTime(); + } + + public int getMaxRetries() + { + return bridge.getMaxRetries(); + } + + public String getQualityOfServiceMode() + { + QualityOfServiceMode mode = bridge.getQualityOfServiceMode(); + if (mode != null) + { + return mode.name(); + } + else + { + return null; + } + } + + public String getSelector() + { + return bridge.getSelector(); + } + + public String getSourcePassword() + { + return bridge.getSourcePassword(); + } + + public String getSourceUsername() + { + return bridge.getSourceUsername(); + } + + public String getSubscriptionName() + { + return bridge.getSubscriptionName(); + } + + public String getTargetPassword() + { + return bridge.getTargetPassword(); + } + + public String getTargetUsername() + { + return bridge.getTargetUsername(); + } + + public String getTransactionManagerLocatorClass() + { + return bridge.getTransactionManagerLocatorClass(); + } + + public String getTransactionManagerLocatorMethod() + { + return bridge.getTransactionManagerLocatorMethod(); + } + + public boolean isAddMessageIDInHeader() + { + return bridge.isAddMessageIDInHeader(); + } + + public boolean isFailed() + { + return bridge.isFailed(); + } + + public boolean isPaused() + { + return bridge.isPaused(); + } + + public void setAddMessageIDInHeader(final boolean value) + { + bridge.setAddMessageIDInHeader(value); + } + + public void setClientID(final String clientID) + { + bridge.setClientID(clientID); + } + + public void setFailureRetryInterval(final long interval) + { + bridge.setFailureRetryInterval(interval); + } + + public void setMaxBatchSize(final int size) + { + bridge.setMaxBatchSize(size); + } + + public void setMaxBatchTime(final long time) + { + bridge.setMaxBatchTime(time); + } + + public void setMaxRetries(final int retries) + { + bridge.setMaxRetries(retries); + } + + public void setQualityOfServiceMode(String mode) + { + if (mode != null) + { + bridge.setQualityOfServiceMode(QualityOfServiceMode.valueOf(mode)); + } + else + { + mode = null; + } + } + + public void setSelector(final String selector) + { + bridge.setSelector(selector); + } + + public void setSourcePassword(final String pwd) + { + bridge.setSourcePassword(pwd); + } + + public void setSourceUsername(final String name) + { + bridge.setSourceUsername(name); + } + + public void setSubscriptionName(final String subname) + { + bridge.setSubscriptionName(subname); + } + + public void setTargetPassword(final String pwd) + { + bridge.setTargetPassword(pwd); + } + + public void setTargetUsername(final String name) + { + bridge.setTargetUsername(name); + } + + public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass) + { + bridge.setTransactionManagerLocatorClass(transactionManagerLocatorClass); + } + + public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod) + { + bridge.setTransactionManagerLocatorMethod(transactionManagerLocatorMethod); + } + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +}
