http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java new file mode 100644 index 0000000..d618e42 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java @@ -0,0 +1,831 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra.inflow; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.resource.ResourceException; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.resource.spi.work.Work; +import javax.resource.spi.work.WorkManager; +import javax.transaction.xa.XAResource; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQExceptionType; +import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; +import org.apache.activemq.api.core.ActiveMQNotConnectedException; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.jms.ActiveMQJMSClient; +import org.apache.activemq.core.client.impl.ClientSessionInternal; +import org.apache.activemq.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.jms.client.ActiveMQDestination; +import org.apache.activemq.jms.server.recovery.XARecoveryConfig; +import org.apache.activemq.ra.ActiveMQRABundle; +import org.apache.activemq.ra.ActiveMQRAConnectionFactory; +import org.apache.activemq.ra.ActiveMQRALogger; +import org.apache.activemq.ra.ActiveMQRaUtils; +import org.apache.activemq.ra.ActiveMQResourceAdapter; +import org.apache.activemq.utils.FutureLatch; +import org.apache.activemq.utils.SensitiveDataCodec; + +/** + * The activation. + * + * @author <a href="adr...@jboss.com">Adrian Brock</a> + * @author <a href="jesper.peder...@jboss.org">Jesper Pedersen</a> + * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> + */ +public class ActiveMQActivation +{ + /** + * Trace enabled + */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * The onMessage method + */ + public static final Method ONMESSAGE; + + /** + * The resource adapter + */ + private final ActiveMQResourceAdapter ra; + + /** + * The activation spec + */ + private final ActiveMQActivationSpec spec; + + /** + * The message endpoint factory + */ + private final MessageEndpointFactory endpointFactory; + + /** + * Whether delivery is active + */ + private final AtomicBoolean deliveryActive = new AtomicBoolean(false); + + /** + * The destination type + */ + private boolean isTopic = false; + + /** + * Is the delivery transacted + */ + private boolean isDeliveryTransacted; + + private ActiveMQDestination destination; + + /** + * The name of the temporary subscription name that all the sessions will share + */ + private SimpleString topicTemporaryQueue; + + private final List<ActiveMQMessageHandler> handlers = new ArrayList<ActiveMQMessageHandler>(); + + private ActiveMQConnectionFactory factory; + + // Whether we are in the failure recovery loop + private final AtomicBoolean inFailure = new AtomicBoolean(false); + private XARecoveryConfig resourceRecovery; + + static + { + try + { + ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[]{Message.class}); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + /** + * Constructor + * + * @param ra The resource adapter + * @param endpointFactory The endpoint factory + * @param spec The activation spec + * @throws ResourceException Thrown if an error occurs + */ + public ActiveMQActivation(final ActiveMQResourceAdapter ra, + final MessageEndpointFactory endpointFactory, + final ActiveMQActivationSpec spec) throws ResourceException + { + spec.validate(); + + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")"); + } + + if (ra.isUseMaskedPassword()) + { + String pass = spec.getOwnPassword(); + if (pass != null) + { + SensitiveDataCodec<String> codec = ra.getCodecInstance(); + + try + { + spec.setPassword(codec.decode(pass)); + } + catch (Exception e) + { + throw new ResourceException(e); + } + } + } + + this.ra = ra; + this.endpointFactory = endpointFactory; + this.spec = spec; + try + { + isDeliveryTransacted = endpointFactory.isDeliveryTransacted(ActiveMQActivation.ONMESSAGE); + } + catch (Exception e) + { + throw new ResourceException(e); + } + } + + /** + * Get the activation spec + * + * @return The value + */ + public ActiveMQActivationSpec getActivationSpec() + { + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("getActivationSpec()"); + } + + return spec; + } + + /** + * Get the message endpoint factory + * + * @return The value + */ + public MessageEndpointFactory getMessageEndpointFactory() + { + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("getMessageEndpointFactory()"); + } + + return endpointFactory; + } + + /** + * Get whether delivery is transacted + * + * @return The value + */ + public boolean isDeliveryTransacted() + { + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("isDeliveryTransacted()"); + } + + return isDeliveryTransacted; + } + + /** + * Get the work manager + * + * @return The value + */ + public WorkManager getWorkManager() + { + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("getWorkManager()"); + } + + return ra.getWorkManager(); + } + + /** + * Is the destination a topic + * + * @return The value + */ + public boolean isTopic() + { + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("isTopic()"); + } + + return isTopic; + } + + /** + * Start the activation + * + * @throws ResourceException Thrown if an error occurs + */ + public void start() throws ResourceException + { + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("start()"); + } + deliveryActive.set(true); + ra.getWorkManager().scheduleWork(new SetupActivation()); + } + + /** + * @return the topicTemporaryQueue + */ + public SimpleString getTopicTemporaryQueue() + { + return topicTemporaryQueue; + } + + /** + * @param topicTemporaryQueue the topicTemporaryQueue to set + */ + public void setTopicTemporaryQueue(SimpleString topicTemporaryQueue) + { + this.topicTemporaryQueue = topicTemporaryQueue; + } + + /** + * @return the list of XAResources for this activation endpoint + */ + public List<XAResource> getXAResources() + { + List<XAResource> xaresources = new ArrayList<XAResource>(); + for (ActiveMQMessageHandler handler : handlers) + { + XAResource xares = handler.getXAResource(); + if (xares != null) + { + xaresources.add(xares); + } + } + return xaresources; + } + + /** + * Stop the activation + */ + public void stop() + { + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("stop()"); + } + + deliveryActive.set(false); + teardown(); + } + + /** + * Setup the activation + * + * @throws Exception Thrown if an error occurs + */ + protected synchronized void setup() throws Exception + { + ActiveMQRALogger.LOGGER.debug("Setting up " + spec); + + setupCF(); + + setupDestination(); + + Exception firstException = null; + + for (int i = 0; i < spec.getMaxSession(); i++) + { + ClientSessionFactory cf = null; + ClientSession session = null; + + try + { + cf = factory.getServerLocator().createSessionFactory(); + session = setupSession(cf); + ActiveMQMessageHandler handler = new ActiveMQMessageHandler(this, ra.getTM(), (ClientSessionInternal) session, cf, i); + handler.setup(); + handlers.add(handler); + } + catch (Exception e) + { + if (cf != null) + { + cf.close(); + } + if (session != null) + { + session.close(); + } + if (firstException == null) + { + firstException = e; + } + } + } + //if we have any exceptions close all the handlers and throw the first exception. + //we don't want partially configured activations, i.e. only 8 out of 15 sessions started so best to stop and log the error. + if (firstException != null) + { + for (ActiveMQMessageHandler handler : handlers) + { + handler.teardown(); + } + throw firstException; + } + + //now start them all together. + for (ActiveMQMessageHandler handler : handlers) + { + handler.start(); + } + + resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword()); + + ActiveMQRALogger.LOGGER.debug("Setup complete " + this); + } + + /** + * Teardown the activation + */ + protected synchronized void teardown() + { + ActiveMQRALogger.LOGGER.debug("Tearing down " + spec); + + if (resourceRecovery != null) + { + ra.getRecoveryManager().unRegister(resourceRecovery); + } + + final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()]; + + // We need to do from last to first as any temporary queue will have been created on the first handler + // So we invert the handlers here + for (int i = 0; i < handlers.size(); i++) + { + // The index here is the complimentary so it's inverting the array + handlersCopy[i] = handlers.get(handlers.size() - i - 1); + } + + handlers.clear(); + + FutureLatch future = new FutureLatch(handlersCopy.length); + List<Thread> interruptThreads = new ArrayList<>(); + for (ActiveMQMessageHandler handler : handlersCopy) + { + Thread thread = handler.interruptConsumer(future); + if (thread != null) + { + interruptThreads.add(thread); + } + } + + //wait for all the consumers to complete any onmessage calls + boolean stuckThreads = !future.await(factory.getCallTimeout()); + //if any are stuck then we need to interrupt them + if (stuckThreads) + { + for (Thread interruptThread : interruptThreads) + { + try + { + interruptThread.interrupt(); + } + catch (Exception e) + { + //ok + } + } + } + + Thread threadTearDown = new Thread("TearDown/ActiveMQActivation") + { + public void run() + { + for (ActiveMQMessageHandler handler : handlersCopy) + { + handler.teardown(); + } + } + }; + + // We will first start a new thread that will call tearDown on all the instances, trying to graciously shutdown everything. + // We will then use the call-timeout to determine a timeout. + // if that failed we will then close the connection factory, and interrupt the thread + threadTearDown.start(); + + try + { + threadTearDown.join(factory.getCallTimeout()); + } + catch (InterruptedException e) + { + // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up + } + + if (threadTearDown.isAlive()) + { + if (factory != null) + { + // This will interrupt any threads waiting on reconnect + factory.close(); + factory = null; + } + threadTearDown.interrupt(); + + try + { + threadTearDown.join(5000); + } + catch (InterruptedException e) + { + // nothing to be done here.. we are going down anyways + } + + if (threadTearDown.isAlive()) + { + ActiveMQRALogger.LOGGER.warn("Thread " + threadTearDown + " couldn't be finished"); + } + } + + if (spec.isHasBeenUpdated() && factory != null) + { + factory.close(); + factory = null; + } + + + ActiveMQRALogger.LOGGER.debug("Tearing down complete " + this); + } + + protected void setupCF() throws Exception + { + if (spec.getConnectionFactoryLookup() != null) + { + Context ctx; + if (spec.getParsedJndiParams() == null) + { + ctx = new InitialContext(); + } + else + { + ctx = new InitialContext(spec.getParsedJndiParams()); + } + Object fac = ctx.lookup(spec.getConnectionFactoryLookup()); + if (fac instanceof ActiveMQConnectionFactory) + { + factory = (ActiveMQConnectionFactory) fac; + } + else + { + ActiveMQRAConnectionFactory raFact = (ActiveMQRAConnectionFactory) fac; + if (spec.isHasBeenUpdated()) + { + factory = raFact.getResourceAdapter().createActiveMQConnectionFactory(spec); + } + else + { + factory = raFact.getDefaultFactory(); + if (factory != ra.getDefaultActiveMQConnectionFactory()) + { + ActiveMQRALogger.LOGGER.warnDifferentConnectionfactory(); + } + } + } + } + else if (spec.isHasBeenUpdated()) + { + factory = ra.createActiveMQConnectionFactory(spec); + } + else + { + factory = ra.getDefaultActiveMQConnectionFactory(); + } + } + + /** + * Setup a session + * + * @param cf + * @return The connection + * @throws Exception Thrown if an error occurs + */ + protected ClientSession setupSession(ClientSessionFactory cf) throws Exception + { + ClientSession result = null; + + try + { + result = ra.createSession(cf, + spec.getAcknowledgeModeInt(), + spec.getUser(), + spec.getPassword(), + ra.getPreAcknowledge(), + ra.getDupsOKBatchSize(), + ra.getTransactionBatchSize(), + isDeliveryTransacted, + spec.isUseLocalTx(), + spec.getTransactionTimeout()); + + result.addMetaData("resource-adapter", "inbound"); + result.addMetaData("jms-session", ""); + String clientID = ra.getClientID() == null ? spec.getClientID() : ra.getClientID(); + if (clientID != null) + { + result.addMetaData("jms-client-id", clientID); + } + + ActiveMQRALogger.LOGGER.debug("Using queue connection " + result); + + return result; + } + catch (Throwable t) + { + try + { + if (result != null) + { + result.close(); + } + } + catch (Exception e) + { + ActiveMQRALogger.LOGGER.trace("Ignored error closing connection", e); + } + if (t instanceof Exception) + { + throw (Exception) t; + } + throw new RuntimeException("Error configuring connection", t); + } + } + + public SimpleString getAddress() + { + return destination.getSimpleAddress(); + } + + protected void setupDestination() throws Exception + { + + String destinationName = spec.getDestination(); + + if (spec.isUseJNDI()) + { + Context ctx; + if (spec.getParsedJndiParams() == null) + { + ctx = new InitialContext(); + } + else + { + ctx = new InitialContext(spec.getParsedJndiParams()); + } + ActiveMQRALogger.LOGGER.debug("Using context " + ctx.getEnvironment() + " for " + spec); + if (ActiveMQActivation.trace) + { + ActiveMQRALogger.LOGGER.trace("setupDestination(" + ctx + ")"); + } + + String destinationTypeString = spec.getDestinationType(); + if (destinationTypeString != null && !destinationTypeString.trim().equals("")) + { + ActiveMQRALogger.LOGGER.debug("Destination type defined as " + destinationTypeString); + + Class<?> destinationType; + if (Topic.class.getName().equals(destinationTypeString)) + { + destinationType = Topic.class; + isTopic = true; + } + else + { + destinationType = Queue.class; + } + + ActiveMQRALogger.LOGGER.debug("Retrieving " + destinationType.getName() + " \"" + destinationName + "\" from JNDI"); + + try + { + destination = (ActiveMQDestination) ActiveMQRaUtils.lookup(ctx, destinationName, destinationType); + } + catch (Exception e) + { + if (destinationName == null) + { + throw ActiveMQRABundle.BUNDLE.noDestinationName(); + } + + String calculatedDestinationName = destinationName.substring(destinationName.lastIndexOf('/') + 1); + + ActiveMQRALogger.LOGGER.debug("Unable to retrieve " + destinationName + + " from JNDI. Creating a new " + destinationType.getName() + + " named " + calculatedDestinationName + " to be used by the MDB."); + + // If there is no binding on naming, we will just create a new instance + if (isTopic) + { + destination = (ActiveMQDestination) ActiveMQJMSClient.createTopic(calculatedDestinationName); + } + else + { + destination = (ActiveMQDestination) ActiveMQJMSClient.createQueue(calculatedDestinationName); + } + } + } + else + { + ActiveMQRALogger.LOGGER.debug("Destination type not defined in MDB activation configuration."); + ActiveMQRALogger.LOGGER.debug("Retrieving " + Destination.class.getName() + " \"" + destinationName + "\" from JNDI"); + + destination = (ActiveMQDestination) ActiveMQRaUtils.lookup(ctx, destinationName, Destination.class); + if (destination instanceof Topic) + { + isTopic = true; + } + } + } + else + { + ActiveMQRALogger.LOGGER.instantiatingDestination(spec.getDestinationType(), spec.getDestination()); + + if (Topic.class.getName().equals(spec.getDestinationType())) + { + destination = (ActiveMQDestination) ActiveMQJMSClient.createTopic(spec.getDestination()); + isTopic = true; + } + else + { + destination = (ActiveMQDestination) ActiveMQJMSClient.createQueue(spec.getDestination()); + } + } + } + + /** + * Get a string representation + * + * @return The value + */ + @Override + public String toString() + { + StringBuffer buffer = new StringBuffer(); + buffer.append(ActiveMQActivation.class.getName()).append('('); + buffer.append("spec=").append(spec.getClass().getName()); + buffer.append(" mepf=").append(endpointFactory.getClass().getName()); + buffer.append(" active=").append(deliveryActive.get()); + if (spec.getDestination() != null) + { + buffer.append(" destination=").append(spec.getDestination()); + } + buffer.append(" transacted=").append(isDeliveryTransacted); + buffer.append(')'); + return buffer.toString(); + } + + /** + * Handles any failure by trying to reconnect + * + * @param failure the reason for the failure + */ + public void handleFailure(Throwable failure) + { + if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) + { + ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); + } + else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) + { + ActiveMQRALogger.LOGGER.awaitingJMSServerCreation(); + } + else + { + ActiveMQRALogger.LOGGER.failureInActivation(failure, spec); + } + int reconnectCount = 0; + int setupAttempts = spec.getSetupAttempts(); + long setupInterval = spec.getSetupInterval(); + + // Only enter the failure loop once + if (inFailure.getAndSet(true)) + return; + try + { + Throwable lastException = failure; + while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) + { + teardown(); + + try + { + Thread.sleep(setupInterval); + } + catch (InterruptedException e) + { + ActiveMQRALogger.LOGGER.debug("Interrupted trying to reconnect " + spec, e); + break; + } + + if (reconnectCount < 1) + { + ActiveMQRALogger.LOGGER.attemptingReconnect(spec); + } + try + { + setup(); + ActiveMQRALogger.LOGGER.reconnected(); + break; + } + catch (Throwable t) + { + if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) + { + if (lastException == null || !(t instanceof ActiveMQNonExistentQueueException)) + { + lastException = t; + ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); + } + } + else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) + { + if (lastException == null || !(t instanceof ActiveMQNotConnectedException)) + { + lastException = t; + ActiveMQRALogger.LOGGER.awaitingJMSServerCreation(); + } + } + else + { + ActiveMQRALogger.LOGGER.errorReconnecting(t, spec); + } + } + ++reconnectCount; + } + } + finally + { + // Leaving failure recovery loop + inFailure.set(false); + } + } + + public ActiveMQConnectionFactory getConnectionFactory() + { + return this.factory; + } + + /** + * Handles the setup + */ + private class SetupActivation implements Work + { + public void run() + { + try + { + setup(); + } + catch (Throwable t) + { + handleFailure(t); + } + } + + public void release() + { + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java new file mode 100644 index 0000000..06f9c7a --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java @@ -0,0 +1,945 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra.inflow; + +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.resource.ResourceException; +import javax.resource.spi.ActivationSpec; +import javax.resource.spi.InvalidPropertyException; +import javax.resource.spi.ResourceAdapter; +import java.beans.IntrospectionException; +import java.beans.PropertyDescriptor; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; + +import org.apache.activemq.ra.ConnectionFactoryProperties; +import org.apache.activemq.ra.ActiveMQRALogger; +import org.apache.activemq.ra.ActiveMQRaUtils; +import org.apache.activemq.ra.ActiveMQResourceAdapter; + +/** + * The activation spec + * These properties are set on the MDB ActivactionProperties + * + * @author <a href="adr...@jboss.com">Adrian Brock</a> + * @author <a href="jesper.peder...@jboss.org">Jesper Pedersen</a> + * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> + * @author <a href="mailto:clebert.suco...@jboss.org">Clebert Suconic</a> + */ +public class ActiveMQActivationSpec extends ConnectionFactoryProperties implements ActivationSpec, Serializable +{ + private static final long serialVersionUID = -7997041053897964654L; + + private static final int DEFAULT_MAX_SESSION = 15; + + /** + * Whether trace is enabled + */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + public String strConnectorClassName; + + public String strConnectionParameters; + + /** + * The resource adapter + */ + private ActiveMQResourceAdapter ra; + + /** + * The connection factory lookup + */ + private String connectionFactoryLookup; + + /** + * The destination + */ + private String destination; + + /** + * The destination type + */ + private String destinationType; + + /** + * The message selector + */ + private String messageSelector; + + /** + * The acknowledgement mode + */ + private int acknowledgeMode; + + /** + * The subscription durability + */ + private boolean subscriptionDurability; + + /** + * The subscription name + */ + private String subscriptionName; + + /** + * If this is true, a durable subscription could be shared by multiple MDB instances + */ + private boolean shareSubscriptions; + + /** + * The user + */ + private String user; + + /** + * The password + */ + private String password; + + /** + * The maximum number of sessions + */ + private Integer maxSession; + + /** + * Transaction timeout + */ + private Integer transactionTimeout; + + private Boolean useJNDI = true; + + private String jndiParams = null; + + private Hashtable parsedJndiParams; + + /* use local tx instead of XA*/ + private Boolean localTx; + + // undefined by default, default is specified at the RA level in ActiveMQRAProperties + private Integer setupAttempts; + + // undefined by default, default is specified at the RA level in ActiveMQRAProperties + private Long setupInterval; + + /** + * Constructor + */ + public ActiveMQActivationSpec() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor()"); + } + + ra = null; + destination = null; + destinationType = null; + messageSelector = null; + acknowledgeMode = Session.AUTO_ACKNOWLEDGE; + subscriptionDurability = false; + subscriptionName = null; + user = null; + password = null; + maxSession = DEFAULT_MAX_SESSION; + transactionTimeout = 0; + } + + /** + * Get the resource adapter + * + * @return The resource adapter + */ + public ResourceAdapter getResourceAdapter() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getResourceAdapter()"); + } + + return ra; + } + + /** + * @return the useJNDI + */ + public boolean isUseJNDI() + { + if (useJNDI == null) + { + return ra.isUseJNDI(); + } + return useJNDI; + } + + /** + * @param value the useJNDI to set + */ + public void setUseJNDI(final boolean value) + { + useJNDI = value; + } + + /** + * @return return the jndi params to use + */ + public String getJndiParams() + { + if (jndiParams == null) + { + return ra.getJndiParams(); + } + return jndiParams; + } + + public void setJndiParams(String jndiParams) + { + this.jndiParams = jndiParams; + parsedJndiParams = ActiveMQRaUtils.parseHashtableConfig(jndiParams); + } + + public Hashtable<?, ?> getParsedJndiParams() + { + if (parsedJndiParams == null) + { + return ra.getParsedJndiParams(); + } + return parsedJndiParams; + } + + /** + * Set the resource adapter + * + * @param ra The resource adapter + * @throws ResourceException Thrown if incorrect resource adapter + */ + public void setResourceAdapter(final ResourceAdapter ra) throws ResourceException + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setResourceAdapter(" + ra + ")"); + } + + if (ra == null || !(ra instanceof ActiveMQResourceAdapter)) + { + throw new ResourceException("Resource adapter is " + ra); + } + + this.ra = (ActiveMQResourceAdapter) ra; + } + + /** + * Get the connection factory lookup + * + * @return The value + */ + public String getConnectionFactoryLookup() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getConnectionFactoryLookup() ->" + connectionFactoryLookup); + } + + return connectionFactoryLookup; + } + + /** + * Set the connection factory lookup + * + * @param value The value + */ + public void setConnectionFactoryLookup(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setConnectionFactoryLookup(" + value + ")"); + } + + connectionFactoryLookup = value; + } + + /** + * Get the destination + * + * @return The value + */ + public String getDestination() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getDestination()"); + } + + return destination; + } + + /** + * Set the destination + * + * @param value The value + */ + public void setDestination(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setDestination(" + value + ")"); + } + + destination = value; + } + + /** + * Get the destination lookup + * + * @return The value + */ + public String getDestinationLookup() + { + return getDestination(); + } + + /** + * Set the destination + * + * @param value The value + */ + public void setDestinationLookup(final String value) + { + setDestination(value); + setUseJNDI(true); + } + + /** + * Get the destination type + * + * @return The value + */ + public String getDestinationType() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getDestinationType()"); + } + + return destinationType; + } + + /** + * Set the destination type + * + * @param value The value + */ + public void setDestinationType(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setDestinationType(" + value + ")"); + } + + destinationType = value; + } + + /** + * Get the message selector + * + * @return The value + */ + public String getMessageSelector() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getMessageSelector()"); + } + + return messageSelector; + } + + /** + * Set the message selector + * + * @param value The value + */ + public void setMessageSelector(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setMessageSelector(" + value + ")"); + } + + messageSelector = value; + } + + /** + * Get the acknowledge mode + * + * @return The value + */ + public String getAcknowledgeMode() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getAcknowledgeMode()"); + } + + if (Session.DUPS_OK_ACKNOWLEDGE == acknowledgeMode) + { + return "Dups-ok-acknowledge"; + } + else + { + return "Auto-acknowledge"; + } + } + + /** + * Set the acknowledge mode + * + * @param value The value + */ + public void setAcknowledgeMode(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setAcknowledgeMode(" + value + ")"); + } + + if ("DUPS_OK_ACKNOWLEDGE".equalsIgnoreCase(value) || "Dups-ok-acknowledge".equalsIgnoreCase(value)) + { + acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE; + } + else if ("AUTO_ACKNOWLEDGE".equalsIgnoreCase(value) || "Auto-acknowledge".equalsIgnoreCase(value)) + { + acknowledgeMode = Session.AUTO_ACKNOWLEDGE; + } + else + { + throw new IllegalArgumentException("Unsupported acknowledgement mode " + value); + } + } + + /** + * @return the acknowledgement mode + */ + public int getAcknowledgeModeInt() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getAcknowledgeMode()"); + } + + return acknowledgeMode; + } + + /** + * Get the subscription durability + * + * @return The value + */ + public String getSubscriptionDurability() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getSubscriptionDurability()"); + } + + if (subscriptionDurability) + { + return "Durable"; + } + else + { + return "NonDurable"; + } + } + + /** + * Set the subscription durability + * + * @param value The value + */ + public void setSubscriptionDurability(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setSubscriptionDurability(" + value + ")"); + } + + subscriptionDurability = "Durable".equals(value); + } + + /** + * Get the status of subscription durability + * + * @return The value + */ + public boolean isSubscriptionDurable() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("isSubscriptionDurable()"); + } + + return subscriptionDurability; + } + + /** + * Get the subscription name + * + * @return The value + */ + public String getSubscriptionName() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getSubscriptionName()"); + } + + return subscriptionName; + } + + /** + * Set the subscription name + * + * @param value The value + */ + public void setSubscriptionName(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setSubscriptionName(" + value + ")"); + } + + subscriptionName = value; + } + + + /** + * @return the shareDurableSubscriptions + */ + public boolean isShareSubscriptions() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("isShareSubscriptions() = " + shareSubscriptions); + } + + return shareSubscriptions; + } + + /** + * @param shareSubscriptions the shareDurableSubscriptions to set + */ + public void setShareSubscriptions(boolean shareSubscriptions) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setShareSubscriptions(" + shareSubscriptions + ")"); + } + + this.shareSubscriptions = shareSubscriptions; + } + + /** + * Get the user + * + * @return The value + */ + public String getUser() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getUser()"); + } + + if (user == null) + { + return ra.getUserName(); + } + else + { + return user; + } + } + + /** + * Set the user + * + * @param value The value + */ + public void setUser(final String value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setUser(" + value + ")"); + } + + user = value; + } + + /** + * Get the password + * + * @return The value + */ + public String getPassword() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getPassword()"); + } + + if (password == null) + { + return ra.getPassword(); + } + else + { + return password; + } + } + + public String getOwnPassword() + { + return password; + } + + /** + * Set the password + * + * @param value The value + */ + public void setPassword(final String value) throws Exception + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setPassword(****)"); + } + + password = value; + } + + /** + * Get the number of max session + * + * @return The value + */ + public Integer getMaxSession() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getMaxSession()"); + } + + if (maxSession == null) + { + return DEFAULT_MAX_SESSION; + } + + return maxSession; + } + + /** + * Set the number of max session + * + * @param value The value + */ + public void setMaxSession(final Integer value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setMaxSession(" + value + ")"); + } + + maxSession = value; + } + + /** + * Get the transaction timeout + * + * @return The value + */ + public Integer getTransactionTimeout() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getTransactionTimeout()"); + } + + return transactionTimeout; + } + + /** + * Set the transaction timeout + * + * @param value The value + */ + public void setTransactionTimeout(final Integer value) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setTransactionTimeout(" + value + ")"); + } + + transactionTimeout = value; + } + + public Boolean isUseLocalTx() + { + if (localTx == null) + { + return ra.getUseLocalTx(); + } + else + { + return localTx; + } + } + + public void setUseLocalTx(final Boolean localTx) + { + this.localTx = localTx; + } + + public int getSetupAttempts() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getSetupAttempts()"); + } + + if (setupAttempts == null) + { + return ra.getSetupAttempts(); + } + else + { + return setupAttempts; + } + } + + public void setSetupAttempts(int setupAttempts) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setSetupAttempts(" + setupAttempts + ")"); + } + + this.setupAttempts = setupAttempts; + } + + public long getSetupInterval() + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("getSetupInterval()"); + } + + if (setupInterval == null) + { + return ra.getSetupInterval(); + } + else + { + return setupInterval; + } + } + + public void setSetupInterval(long setupInterval) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setSetupInterval(" + setupInterval + ")"); + } + + this.setupInterval = setupInterval; + } + + /** + * Validate + * + * @throws InvalidPropertyException Thrown if a validation exception occurs + */ + public void validate() throws InvalidPropertyException + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("validate()"); + } + + List<String> errorMessages = new ArrayList<String>(); + List<PropertyDescriptor> propsNotSet = new ArrayList<PropertyDescriptor>(); + + try + { + if (destination == null || destination.trim().equals("")) + { + propsNotSet.add(new PropertyDescriptor("destination", ActiveMQActivationSpec.class)); + errorMessages.add("Destination is mandatory."); + } + + if (destinationType != null && !Topic.class.getName().equals(destinationType) && !Queue.class.getName().equals(destinationType)) + { + propsNotSet.add(new PropertyDescriptor("destinationType", ActiveMQActivationSpec.class)); + errorMessages.add("If set, the destinationType must be either 'javax.jms.Topic' or 'javax.jms.Queue'."); + } + + if ((destinationType == null || destinationType.length() == 0 || Topic.class.getName().equals(destinationType)) && isSubscriptionDurable() && (subscriptionName == null || subscriptionName.length() == 0)) + { + propsNotSet.add(new PropertyDescriptor("subscriptionName", ActiveMQActivationSpec.class)); + errorMessages.add("If subscription is durable then subscription name must be specified."); + } + } + catch (IntrospectionException e) + { + e.printStackTrace(); + } + + if (propsNotSet.size() > 0) + { + StringBuffer b = new StringBuffer(); + b.append("Invalid settings:"); + for (Iterator<String> iter = errorMessages.iterator(); iter.hasNext();) + { + b.append(" "); + b.append(iter.next()); + } + InvalidPropertyException e = new InvalidPropertyException(b.toString()); + final PropertyDescriptor[] descriptors = propsNotSet.toArray(new PropertyDescriptor[propsNotSet.size()]); + e.setInvalidPropertyDescriptors(descriptors); + throw e; + } + } + + public String getConnectorClassName() + { + return strConnectorClassName; + } + + public void setConnectorClassName(final String connectorClassName) + { + if (ActiveMQActivationSpec.trace) + { + ActiveMQRALogger.LOGGER.trace("setConnectorClassName(" + connectorClassName + ")"); + } + + strConnectorClassName = connectorClassName; + + setParsedConnectorClassNames(ActiveMQRaUtils.parseConnectorConnectorConfig(connectorClassName)); + } + + /** + * @return the connectionParameters + */ + public String getConnectionParameters() + { + return strConnectionParameters; + } + + public void setConnectionParameters(final String configuration) + { + strConnectionParameters = configuration; + setParsedConnectionParameters(ActiveMQRaUtils.parseConfig(configuration)); + } + + /** + * Get a string representation + * + * @return The value + */ + @Override + public String toString() + { + StringBuffer buffer = new StringBuffer(); + buffer.append(ActiveMQActivationSpec.class.getName()).append('('); + buffer.append("ra=").append(ra); + if (messageSelector != null) + { + buffer.append(" connectionFactoryLookup=").append(connectionFactoryLookup); + } + buffer.append(" destination=").append(destination); + buffer.append(" destinationType=").append(destinationType); + if (messageSelector != null) + { + buffer.append(" selector=").append(messageSelector); + } + buffer.append(" ack=").append(getAcknowledgeMode()); + buffer.append(" durable=").append(subscriptionDurability); + buffer.append(" clientID=").append(getClientID()); + if (subscriptionName != null) + { + buffer.append(" subscription=").append(subscriptionName); + } + buffer.append(" user=").append(user); + if (password != null) + { + buffer.append(" password=").append("****"); + } + buffer.append(" maxSession=").append(maxSession); + buffer.append(')'); + return buffer.toString(); + } + + // here for backwards compatibilty + public void setUseDLQ(final boolean b) + { + } + + public void setDLQJNDIName(final String name) + { + } + + public void setDLQHandler(final String handler) + { + } + + public void setDLQMaxResent(final int maxResent) + { + } + + public void setProviderAdapterJNDI(final String jndi) + { + } + + /** + * @param keepAlive the keepAlive to set + */ + public void setKeepAlive(boolean keepAlive) + { + } + + /** + * @param keepAliveMillis the keepAliveMillis to set + */ + public void setKeepAliveMillis(long keepAliveMillis) + { + } + + + public void setReconnectInterval(long interval) + { + } + + public void setMinSession(final Integer value) + { + } + + public void setMaxMessages(final Integer value) + { + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java new file mode 100644 index 0000000..abd68d0 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java @@ -0,0 +1,430 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra.inflow; + +import javax.jms.MessageListener; +import javax.resource.ResourceException; +import javax.resource.spi.endpoint.MessageEndpoint; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; +import java.util.UUID; + +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientMessage; +import org.apache.activemq.api.core.client.ClientSession.QueueQuery; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.core.client.MessageHandler; +import org.apache.activemq.core.client.impl.ClientConsumerInternal; +import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.core.client.impl.ClientSessionInternal; +import org.apache.activemq.jms.client.ActiveMQDestination; +import org.apache.activemq.jms.client.ActiveMQMessage; +import org.apache.activemq.ra.ActiveMQRALogger; +import org.apache.activemq.ra.ActiveMQResourceAdapter; +import org.apache.activemq.ra.ActiveMQXAResourceWrapper; +import org.apache.activemq.utils.FutureLatch; + +/** + * The message handler + * + * @author <a href="adr...@jboss.com">Adrian Brock</a> + * @author <a href="mailto:jesper.peder...@jboss.org">Jesper Pedersen</a> + * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> + * @author <a href="mailto:mtay...@redhat.com">Martyn Taylor</a> + */ +public class ActiveMQMessageHandler implements MessageHandler +{ + /** + * Trace enabled + */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + /** + * The session + */ + private final ClientSessionInternal session; + + private ClientConsumerInternal consumer; + + /** + * The endpoint + */ + private MessageEndpoint endpoint; + + private final ActiveMQActivation activation; + + private boolean useLocalTx; + + private boolean transacted; + + private boolean useXA = false; + + private final int sessionNr; + + private final TransactionManager tm; + + private ClientSessionFactory cf; + + public ActiveMQMessageHandler(final ActiveMQActivation activation, + final TransactionManager tm, + final ClientSessionInternal session, + final ClientSessionFactory cf, + final int sessionNr) + { + this.activation = activation; + this.session = session; + this.cf = cf; + this.sessionNr = sessionNr; + this.tm = tm; + } + + public void setup() throws Exception + { + if (ActiveMQMessageHandler.trace) + { + ActiveMQRALogger.LOGGER.trace("setup()"); + } + + ActiveMQActivationSpec spec = activation.getActivationSpec(); + String selector = spec.getMessageSelector(); + + // Create the message consumer + SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector); + if (activation.isTopic() && spec.isSubscriptionDurable()) + { + SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, + spec.getClientID(), + spec.getSubscriptionName())); + + QueueQuery subResponse = session.queueQuery(queueName); + + if (!subResponse.isExists()) + { + session.createQueue(activation.getAddress(), queueName, selectorString, true); + } + else + { + // The check for already exists should be done only at the first session + // As a deployed MDB could set up multiple instances in order to process messages in parallel. + if (sessionNr == 0 && subResponse.getConsumerCount() > 0) + { + if (!spec.isShareSubscriptions()) + { + throw new javax.jms.IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)"); + } + else if (ActiveMQRALogger.LOGGER.isDebugEnabled()) + { + ActiveMQRALogger.LOGGER.debug("the mdb on destination " + queueName + " already had " + + subResponse.getConsumerCount() + + " consumers but the MDB is configured to share subscriptions, so no exceptions are thrown"); + } + } + + SimpleString oldFilterString = subResponse.getFilterString(); + + boolean selectorChanged = selector == null && oldFilterString != null || + oldFilterString == null && + selector != null || + (oldFilterString != null && selector != null && !oldFilterString.toString() + .equals(selector)); + + SimpleString oldTopicName = subResponse.getAddress(); + + boolean topicChanged = !oldTopicName.equals(activation.getAddress()); + + if (selectorChanged || topicChanged) + { + // Delete the old durable sub + session.deleteQueue(queueName); + + // Create the new one + session.createQueue(activation.getAddress(), queueName, selectorString, true); + } + } + consumer = (ClientConsumerInternal) session.createConsumer(queueName, null, false); + } + else + { + SimpleString tempQueueName; + if (activation.isTopic()) + { + if (activation.getTopicTemporaryQueue() == null) + { + tempQueueName = new SimpleString(UUID.randomUUID().toString()); + session.createTemporaryQueue(activation.getAddress(), tempQueueName, selectorString); + activation.setTopicTemporaryQueue(tempQueueName); + } + else + { + tempQueueName = activation.getTopicTemporaryQueue(); + QueueQuery queueQuery = session.queueQuery(tempQueueName); + if (!queueQuery.isExists()) + { + // this is because we could be using remote servers (in cluster maybe) + // and the queue wasn't created on that node yet. + session.createTemporaryQueue(activation.getAddress(), tempQueueName, selectorString); + } + } + } + else + { + tempQueueName = activation.getAddress(); + } + consumer = (ClientConsumerInternal) session.createConsumer(tempQueueName, selectorString); + } + + // Create the endpoint, if we are transacted pass the session so it is enlisted, unless using Local TX + MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory(); + useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx(); + transacted = activation.isDeliveryTransacted(); + if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx()) + { + XAResource xaResource = new ActiveMQXAResourceWrapper(session, + ((ActiveMQResourceAdapter) spec.getResourceAdapter()).getJndiName(), + ((ClientSessionFactoryInternal) cf).getLiveNodeId()); + endpoint = endpointFactory.createEndpoint(xaResource); + useXA = true; + } + else + { + endpoint = endpointFactory.createEndpoint(null); + useXA = false; + } + consumer.setMessageHandler(this); + } + + XAResource getXAResource() + { + return useXA ? session : null; + } + + public Thread interruptConsumer(FutureLatch future) + { + try + { + if (consumer != null) + { + return consumer.prepareForClose(future); + } + } + catch (Throwable e) + { + ActiveMQRALogger.LOGGER.warn("Error interrupting handler on endpoint " + endpoint + " handler=" + consumer); + } + return null; + } + + /** + * Stop the handler + */ + public void teardown() + { + if (ActiveMQMessageHandler.trace) + { + ActiveMQRALogger.LOGGER.trace("teardown()"); + } + + try + { + if (endpoint != null) + { + endpoint.release(); + endpoint = null; + } + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.debug("Error releasing endpoint " + endpoint, t); + } + + try + { + consumer.close(); + if (activation.getTopicTemporaryQueue() != null) + { + // We need to delete temporary topics when the activation is stopped or messages will build up on the server + SimpleString tmpQueue = activation.getTopicTemporaryQueue(); + QueueQuery subResponse = session.queueQuery(tmpQueue); + if (subResponse.getConsumerCount() == 0) + { + // This is optional really, since we now use temporaryQueues, we could simply ignore this + // and the server temporary queue would remove this as soon as the queue was removed + session.deleteQueue(tmpQueue); + } + } + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.debug("Error closing core-queue consumer", t); + } + + try + { + if (session != null) + { + session.close(); + } + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.debug("Error releasing session " + session, t); + } + + try + { + if (cf != null) + { + cf.close(); + } + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + session, t); + } + } + + public void onMessage(final ClientMessage message) + { + if (ActiveMQMessageHandler.trace) + { + ActiveMQRALogger.LOGGER.trace("onMessage(" + message + ")"); + } + + ActiveMQMessage msg = ActiveMQMessage.createMessage(message, session); + boolean beforeDelivery = false; + + try + { + if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null) + { + tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout()); + } + endpoint.beforeDelivery(ActiveMQActivation.ONMESSAGE); + beforeDelivery = true; + msg.doBeforeReceive(); + + //In the transacted case the message must be acked *before* onMessage is called + + if (transacted) + { + message.acknowledge(); + } + + ((MessageListener) endpoint).onMessage(msg); + + if (!transacted) + { + message.acknowledge(); + } + + try + { + endpoint.afterDelivery(); + } + catch (ResourceException e) + { + ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e); + return; + } + if (useLocalTx) + { + session.commit(); + } + + if (trace) + { + ActiveMQRALogger.LOGGER.trace("finished onMessage on " + message); + } + } + catch (Throwable e) + { + ActiveMQRALogger.LOGGER.errorDeliveringMessage(e); + // we need to call before/afterDelivery as a pair + if (beforeDelivery) + { + if (useXA && tm != null) + { + // This is the job for the container, + // however if the container throws an exception because of some other errors, + // there are situations where the container is not setting the rollback only + // this is to avoid a scenario where afterDelivery would kick in + try + { + Transaction tx = tm.getTransaction(); + if (tx != null) + { + tx.setRollbackOnly(); + } + } + catch (Exception e1) + { + ActiveMQRALogger.LOGGER.warn("unnable to clear the transaction", e1); + try + { + session.rollback(); + } + catch (ActiveMQException e2) + { + ActiveMQRALogger.LOGGER.warn("Unable to rollback", e2); + return; + } + } + } + + MessageEndpoint endToUse = endpoint; + try + { + // to avoid a NPE that would happen while the RA is in tearDown + if (endToUse != null) + { + endToUse.afterDelivery(); + } + } + catch (ResourceException e1) + { + ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e1); + } + } + if (useLocalTx || !activation.isDeliveryTransacted()) + { + try + { + session.rollback(true); + } + catch (ActiveMQException e1) + { + ActiveMQRALogger.LOGGER.unableToRollbackTX(); + } + } + } + finally + { + try + { + session.resetIfNeeded(); + } + catch (ActiveMQException e) + { + ActiveMQRALogger.LOGGER.unableToResetSession(); + } + } + + } + + public void start() throws ActiveMQException + { + session.start(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java deleted file mode 100644 index 8bf12b6..0000000 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java +++ /dev/null @@ -1,831 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.ra.inflow; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.resource.ResourceException; -import javax.resource.spi.endpoint.MessageEndpointFactory; -import javax.resource.spi.work.Work; -import javax.resource.spi.work.WorkManager; -import javax.transaction.xa.XAResource; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.ActiveMQExceptionType; -import org.apache.activemq.api.core.ActiveMQNonExistentQueueException; -import org.apache.activemq.api.core.ActiveMQNotConnectedException; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.core.client.ClientSessionFactory; -import org.apache.activemq.api.jms.HornetQJMSClient; -import org.apache.activemq.core.client.impl.ClientSessionInternal; -import org.apache.activemq.jms.client.HornetQConnectionFactory; -import org.apache.activemq.jms.client.HornetQDestination; -import org.apache.activemq.jms.server.recovery.XARecoveryConfig; -import org.apache.activemq.ra.HornetQRABundle; -import org.apache.activemq.ra.HornetQRAConnectionFactory; -import org.apache.activemq.ra.HornetQRALogger; -import org.apache.activemq.ra.HornetQRaUtils; -import org.apache.activemq.ra.HornetQResourceAdapter; -import org.apache.activemq.utils.FutureLatch; -import org.apache.activemq.utils.SensitiveDataCodec; - -/** - * The activation. - * - * @author <a href="adr...@jboss.com">Adrian Brock</a> - * @author <a href="jesper.peder...@jboss.org">Jesper Pedersen</a> - * @author <a href="mailto:andy.tay...@jboss.org">Andy Taylor</a> - */ -public class HornetQActivation -{ - /** - * Trace enabled - */ - private static boolean trace = HornetQRALogger.LOGGER.isTraceEnabled(); - - /** - * The onMessage method - */ - public static final Method ONMESSAGE; - - /** - * The resource adapter - */ - private final HornetQResourceAdapter ra; - - /** - * The activation spec - */ - private final HornetQActivationSpec spec; - - /** - * The message endpoint factory - */ - private final MessageEndpointFactory endpointFactory; - - /** - * Whether delivery is active - */ - private final AtomicBoolean deliveryActive = new AtomicBoolean(false); - - /** - * The destination type - */ - private boolean isTopic = false; - - /** - * Is the delivery transacted - */ - private boolean isDeliveryTransacted; - - private HornetQDestination destination; - - /** - * The name of the temporary subscription name that all the sessions will share - */ - private SimpleString topicTemporaryQueue; - - private final List<HornetQMessageHandler> handlers = new ArrayList<HornetQMessageHandler>(); - - private HornetQConnectionFactory factory; - - // Whether we are in the failure recovery loop - private final AtomicBoolean inFailure = new AtomicBoolean(false); - private XARecoveryConfig resourceRecovery; - - static - { - try - { - ONMESSAGE = MessageListener.class.getMethod("onMessage", new Class[]{Message.class}); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - /** - * Constructor - * - * @param ra The resource adapter - * @param endpointFactory The endpoint factory - * @param spec The activation spec - * @throws ResourceException Thrown if an error occurs - */ - public HornetQActivation(final HornetQResourceAdapter ra, - final MessageEndpointFactory endpointFactory, - final HornetQActivationSpec spec) throws ResourceException - { - spec.validate(); - - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("constructor(" + ra + ", " + endpointFactory + ", " + spec + ")"); - } - - if (ra.isUseMaskedPassword()) - { - String pass = spec.getOwnPassword(); - if (pass != null) - { - SensitiveDataCodec<String> codec = ra.getCodecInstance(); - - try - { - spec.setPassword(codec.decode(pass)); - } - catch (Exception e) - { - throw new ResourceException(e); - } - } - } - - this.ra = ra; - this.endpointFactory = endpointFactory; - this.spec = spec; - try - { - isDeliveryTransacted = endpointFactory.isDeliveryTransacted(HornetQActivation.ONMESSAGE); - } - catch (Exception e) - { - throw new ResourceException(e); - } - } - - /** - * Get the activation spec - * - * @return The value - */ - public HornetQActivationSpec getActivationSpec() - { - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("getActivationSpec()"); - } - - return spec; - } - - /** - * Get the message endpoint factory - * - * @return The value - */ - public MessageEndpointFactory getMessageEndpointFactory() - { - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("getMessageEndpointFactory()"); - } - - return endpointFactory; - } - - /** - * Get whether delivery is transacted - * - * @return The value - */ - public boolean isDeliveryTransacted() - { - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("isDeliveryTransacted()"); - } - - return isDeliveryTransacted; - } - - /** - * Get the work manager - * - * @return The value - */ - public WorkManager getWorkManager() - { - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("getWorkManager()"); - } - - return ra.getWorkManager(); - } - - /** - * Is the destination a topic - * - * @return The value - */ - public boolean isTopic() - { - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("isTopic()"); - } - - return isTopic; - } - - /** - * Start the activation - * - * @throws ResourceException Thrown if an error occurs - */ - public void start() throws ResourceException - { - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("start()"); - } - deliveryActive.set(true); - ra.getWorkManager().scheduleWork(new SetupActivation()); - } - - /** - * @return the topicTemporaryQueue - */ - public SimpleString getTopicTemporaryQueue() - { - return topicTemporaryQueue; - } - - /** - * @param topicTemporaryQueue the topicTemporaryQueue to set - */ - public void setTopicTemporaryQueue(SimpleString topicTemporaryQueue) - { - this.topicTemporaryQueue = topicTemporaryQueue; - } - - /** - * @return the list of XAResources for this activation endpoint - */ - public List<XAResource> getXAResources() - { - List<XAResource> xaresources = new ArrayList<XAResource>(); - for (HornetQMessageHandler handler : handlers) - { - XAResource xares = handler.getXAResource(); - if (xares != null) - { - xaresources.add(xares); - } - } - return xaresources; - } - - /** - * Stop the activation - */ - public void stop() - { - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("stop()"); - } - - deliveryActive.set(false); - teardown(); - } - - /** - * Setup the activation - * - * @throws Exception Thrown if an error occurs - */ - protected synchronized void setup() throws Exception - { - HornetQRALogger.LOGGER.debug("Setting up " + spec); - - setupCF(); - - setupDestination(); - - Exception firstException = null; - - for (int i = 0; i < spec.getMaxSession(); i++) - { - ClientSessionFactory cf = null; - ClientSession session = null; - - try - { - cf = factory.getServerLocator().createSessionFactory(); - session = setupSession(cf); - HornetQMessageHandler handler = new HornetQMessageHandler(this, ra.getTM(), (ClientSessionInternal) session, cf, i); - handler.setup(); - handlers.add(handler); - } - catch (Exception e) - { - if (cf != null) - { - cf.close(); - } - if (session != null) - { - session.close(); - } - if (firstException == null) - { - firstException = e; - } - } - } - //if we have any exceptions close all the handlers and throw the first exception. - //we don't want partially configured activations, i.e. only 8 out of 15 sessions started so best to stop and log the error. - if (firstException != null) - { - for (HornetQMessageHandler handler : handlers) - { - handler.teardown(); - } - throw firstException; - } - - //now start them all together. - for (HornetQMessageHandler handler : handlers) - { - handler.start(); - } - - resourceRecovery = ra.getRecoveryManager().register(factory, spec.getUser(), spec.getPassword()); - - HornetQRALogger.LOGGER.debug("Setup complete " + this); - } - - /** - * Teardown the activation - */ - protected synchronized void teardown() - { - HornetQRALogger.LOGGER.debug("Tearing down " + spec); - - if (resourceRecovery != null) - { - ra.getRecoveryManager().unRegister(resourceRecovery); - } - - final HornetQMessageHandler[] handlersCopy = new HornetQMessageHandler[handlers.size()]; - - // We need to do from last to first as any temporary queue will have been created on the first handler - // So we invert the handlers here - for (int i = 0; i < handlers.size(); i++) - { - // The index here is the complimentary so it's inverting the array - handlersCopy[i] = handlers.get(handlers.size() - i - 1); - } - - handlers.clear(); - - FutureLatch future = new FutureLatch(handlersCopy.length); - List<Thread> interruptThreads = new ArrayList<>(); - for (HornetQMessageHandler handler : handlersCopy) - { - Thread thread = handler.interruptConsumer(future); - if (thread != null) - { - interruptThreads.add(thread); - } - } - - //wait for all the consumers to complete any onmessage calls - boolean stuckThreads = !future.await(factory.getCallTimeout()); - //if any are stuck then we need to interrupt them - if (stuckThreads) - { - for (Thread interruptThread : interruptThreads) - { - try - { - interruptThread.interrupt(); - } - catch (Exception e) - { - //ok - } - } - } - - Thread threadTearDown = new Thread("TearDown/HornetQActivation") - { - public void run() - { - for (HornetQMessageHandler handler : handlersCopy) - { - handler.teardown(); - } - } - }; - - // We will first start a new thread that will call tearDown on all the instances, trying to graciously shutdown everything. - // We will then use the call-timeout to determine a timeout. - // if that failed we will then close the connection factory, and interrupt the thread - threadTearDown.start(); - - try - { - threadTearDown.join(factory.getCallTimeout()); - } - catch (InterruptedException e) - { - // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up - } - - if (threadTearDown.isAlive()) - { - if (factory != null) - { - // This will interrupt any threads waiting on reconnect - factory.close(); - factory = null; - } - threadTearDown.interrupt(); - - try - { - threadTearDown.join(5000); - } - catch (InterruptedException e) - { - // nothing to be done here.. we are going down anyways - } - - if (threadTearDown.isAlive()) - { - HornetQRALogger.LOGGER.warn("Thread " + threadTearDown + " couldn't be finished"); - } - } - - if (spec.isHasBeenUpdated() && factory != null) - { - factory.close(); - factory = null; - } - - - HornetQRALogger.LOGGER.debug("Tearing down complete " + this); - } - - protected void setupCF() throws Exception - { - if (spec.getConnectionFactoryLookup() != null) - { - Context ctx; - if (spec.getParsedJndiParams() == null) - { - ctx = new InitialContext(); - } - else - { - ctx = new InitialContext(spec.getParsedJndiParams()); - } - Object fac = ctx.lookup(spec.getConnectionFactoryLookup()); - if (fac instanceof HornetQConnectionFactory) - { - factory = (HornetQConnectionFactory) fac; - } - else - { - HornetQRAConnectionFactory raFact = (HornetQRAConnectionFactory) fac; - if (spec.isHasBeenUpdated()) - { - factory = raFact.getResourceAdapter().createHornetQConnectionFactory(spec); - } - else - { - factory = raFact.getDefaultFactory(); - if (factory != ra.getDefaultHornetQConnectionFactory()) - { - HornetQRALogger.LOGGER.warnDifferentConnectionfactory(); - } - } - } - } - else if (spec.isHasBeenUpdated()) - { - factory = ra.createHornetQConnectionFactory(spec); - } - else - { - factory = ra.getDefaultHornetQConnectionFactory(); - } - } - - /** - * Setup a session - * - * @param cf - * @return The connection - * @throws Exception Thrown if an error occurs - */ - protected ClientSession setupSession(ClientSessionFactory cf) throws Exception - { - ClientSession result = null; - - try - { - result = ra.createSession(cf, - spec.getAcknowledgeModeInt(), - spec.getUser(), - spec.getPassword(), - ra.getPreAcknowledge(), - ra.getDupsOKBatchSize(), - ra.getTransactionBatchSize(), - isDeliveryTransacted, - spec.isUseLocalTx(), - spec.getTransactionTimeout()); - - result.addMetaData("resource-adapter", "inbound"); - result.addMetaData("jms-session", ""); - String clientID = ra.getClientID() == null ? spec.getClientID() : ra.getClientID(); - if (clientID != null) - { - result.addMetaData("jms-client-id", clientID); - } - - HornetQRALogger.LOGGER.debug("Using queue connection " + result); - - return result; - } - catch (Throwable t) - { - try - { - if (result != null) - { - result.close(); - } - } - catch (Exception e) - { - HornetQRALogger.LOGGER.trace("Ignored error closing connection", e); - } - if (t instanceof Exception) - { - throw (Exception) t; - } - throw new RuntimeException("Error configuring connection", t); - } - } - - public SimpleString getAddress() - { - return destination.getSimpleAddress(); - } - - protected void setupDestination() throws Exception - { - - String destinationName = spec.getDestination(); - - if (spec.isUseJNDI()) - { - Context ctx; - if (spec.getParsedJndiParams() == null) - { - ctx = new InitialContext(); - } - else - { - ctx = new InitialContext(spec.getParsedJndiParams()); - } - HornetQRALogger.LOGGER.debug("Using context " + ctx.getEnvironment() + " for " + spec); - if (HornetQActivation.trace) - { - HornetQRALogger.LOGGER.trace("setupDestination(" + ctx + ")"); - } - - String destinationTypeString = spec.getDestinationType(); - if (destinationTypeString != null && !destinationTypeString.trim().equals("")) - { - HornetQRALogger.LOGGER.debug("Destination type defined as " + destinationTypeString); - - Class<?> destinationType; - if (Topic.class.getName().equals(destinationTypeString)) - { - destinationType = Topic.class; - isTopic = true; - } - else - { - destinationType = Queue.class; - } - - HornetQRALogger.LOGGER.debug("Retrieving " + destinationType.getName() + " \"" + destinationName + "\" from JNDI"); - - try - { - destination = (HornetQDestination) HornetQRaUtils.lookup(ctx, destinationName, destinationType); - } - catch (Exception e) - { - if (destinationName == null) - { - throw HornetQRABundle.BUNDLE.noDestinationName(); - } - - String calculatedDestinationName = destinationName.substring(destinationName.lastIndexOf('/') + 1); - - HornetQRALogger.LOGGER.debug("Unable to retrieve " + destinationName + - " from JNDI. Creating a new " + destinationType.getName() + - " named " + calculatedDestinationName + " to be used by the MDB."); - - // If there is no binding on naming, we will just create a new instance - if (isTopic) - { - destination = (HornetQDestination) HornetQJMSClient.createTopic(calculatedDestinationName); - } - else - { - destination = (HornetQDestination) HornetQJMSClient.createQueue(calculatedDestinationName); - } - } - } - else - { - HornetQRALogger.LOGGER.debug("Destination type not defined in MDB activation configuration."); - HornetQRALogger.LOGGER.debug("Retrieving " + Destination.class.getName() + " \"" + destinationName + "\" from JNDI"); - - destination = (HornetQDestination) HornetQRaUtils.lookup(ctx, destinationName, Destination.class); - if (destination instanceof Topic) - { - isTopic = true; - } - } - } - else - { - HornetQRALogger.LOGGER.instantiatingDestination(spec.getDestinationType(), spec.getDestination()); - - if (Topic.class.getName().equals(spec.getDestinationType())) - { - destination = (HornetQDestination) HornetQJMSClient.createTopic(spec.getDestination()); - isTopic = true; - } - else - { - destination = (HornetQDestination) HornetQJMSClient.createQueue(spec.getDestination()); - } - } - } - - /** - * Get a string representation - * - * @return The value - */ - @Override - public String toString() - { - StringBuffer buffer = new StringBuffer(); - buffer.append(HornetQActivation.class.getName()).append('('); - buffer.append("spec=").append(spec.getClass().getName()); - buffer.append(" mepf=").append(endpointFactory.getClass().getName()); - buffer.append(" active=").append(deliveryActive.get()); - if (spec.getDestination() != null) - { - buffer.append(" destination=").append(spec.getDestination()); - } - buffer.append(" transacted=").append(isDeliveryTransacted); - buffer.append(')'); - return buffer.toString(); - } - - /** - * Handles any failure by trying to reconnect - * - * @param failure the reason for the failure - */ - public void handleFailure(Throwable failure) - { - if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) - { - HornetQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); - } - else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) - { - HornetQRALogger.LOGGER.awaitingJMSServerCreation(); - } - else - { - HornetQRALogger.LOGGER.failureInActivation(failure, spec); - } - int reconnectCount = 0; - int setupAttempts = spec.getSetupAttempts(); - long setupInterval = spec.getSetupInterval(); - - // Only enter the failure loop once - if (inFailure.getAndSet(true)) - return; - try - { - Throwable lastException = failure; - while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) - { - teardown(); - - try - { - Thread.sleep(setupInterval); - } - catch (InterruptedException e) - { - HornetQRALogger.LOGGER.debug("Interrupted trying to reconnect " + spec, e); - break; - } - - if (reconnectCount < 1) - { - HornetQRALogger.LOGGER.attemptingReconnect(spec); - } - try - { - setup(); - HornetQRALogger.LOGGER.reconnected(); - break; - } - catch (Throwable t) - { - if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) - { - if (lastException == null || !(t instanceof ActiveMQNonExistentQueueException)) - { - lastException = t; - HornetQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination()); - } - } - else if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED) - { - if (lastException == null || !(t instanceof ActiveMQNotConnectedException)) - { - lastException = t; - HornetQRALogger.LOGGER.awaitingJMSServerCreation(); - } - } - else - { - HornetQRALogger.LOGGER.errorReconnecting(t, spec); - } - } - ++reconnectCount; - } - } - finally - { - // Leaving failure recovery loop - inFailure.set(false); - } - } - - public HornetQConnectionFactory getConnectionFactory() - { - return this.factory; - } - - /** - * Handles the setup - */ - private class SetupActivation implements Work - { - public void run() - { - try - { - setup(); - } - catch (Throwable t) - { - handleFailure(t); - } - } - - public void release() - { - } - } -}