http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
new file mode 100644
index 0000000..d618e42
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
@@ -0,0 +1,831 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra.inflow;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.ActiveMQExceptionType;
+import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
+import org.apache.activemq.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.jms.client.ActiveMQDestination;
+import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
+import org.apache.activemq.ra.ActiveMQRABundle;
+import org.apache.activemq.ra.ActiveMQRAConnectionFactory;
+import org.apache.activemq.ra.ActiveMQRALogger;
+import org.apache.activemq.ra.ActiveMQRaUtils;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.utils.FutureLatch;
+import org.apache.activemq.utils.SensitiveDataCodec;
+
+/**
+ * The activation.
+ *
+ * @author <a href="adr...@jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.peder...@jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.tay...@jboss.org";>Andy Taylor</a>
+ */
+public class ActiveMQActivation
+{
+   /**
+    * Trace enabled
+    */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   /**
+    * The onMessage method
+    */
+   public static final Method ONMESSAGE;
+
+   /**
+    * The resource adapter
+    */
+   private final ActiveMQResourceAdapter ra;
+
+   /**
+    * The activation spec
+    */
+   private final ActiveMQActivationSpec spec;
+
+   /**
+    * The message endpoint factory
+    */
+   private final MessageEndpointFactory endpointFactory;
+
+   /**
+    * Whether delivery is active
+    */
+   private final AtomicBoolean deliveryActive = new AtomicBoolean(false);
+
+   /**
+    * The destination type
+    */
+   private boolean isTopic = false;
+
+   /**
+    * Is the delivery transacted
+    */
+   private boolean isDeliveryTransacted;
+
+   private ActiveMQDestination destination;
+
+   /**
+    * The name of the temporary subscription name that all the sessions will 
share
+    */
+   private SimpleString topicTemporaryQueue;
+
+   private final List<ActiveMQMessageHandler> handlers = new 
ArrayList<ActiveMQMessageHandler>();
+
+   private ActiveMQConnectionFactory factory;
+
+   // Whether we are in the failure recovery loop
+   private final AtomicBoolean inFailure = new AtomicBoolean(false);
+   private XARecoveryConfig resourceRecovery;
+
+   static
+   {
+      try
+      {
+         ONMESSAGE = MessageListener.class.getMethod("onMessage", new 
Class[]{Message.class});
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);
+      }
+   }
+
+   /**
+    * Constructor
+    *
+    * @param ra              The resource adapter
+    * @param endpointFactory The endpoint factory
+    * @param spec            The activation spec
+    * @throws ResourceException Thrown if an error occurs
+    */
+   public ActiveMQActivation(final ActiveMQResourceAdapter ra,
+                             final MessageEndpointFactory endpointFactory,
+                             final ActiveMQActivationSpec spec) throws 
ResourceException
+   {
+      spec.validate();
+
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor(" + ra + ", " + 
endpointFactory + ", " + spec + ")");
+      }
+
+      if (ra.isUseMaskedPassword())
+      {
+         String pass = spec.getOwnPassword();
+         if (pass != null)
+         {
+            SensitiveDataCodec<String> codec = ra.getCodecInstance();
+
+            try
+            {
+               spec.setPassword(codec.decode(pass));
+            }
+            catch (Exception e)
+            {
+               throw new ResourceException(e);
+            }
+         }
+      }
+
+      this.ra = ra;
+      this.endpointFactory = endpointFactory;
+      this.spec = spec;
+      try
+      {
+         isDeliveryTransacted = 
endpointFactory.isDeliveryTransacted(ActiveMQActivation.ONMESSAGE);
+      }
+      catch (Exception e)
+      {
+         throw new ResourceException(e);
+      }
+   }
+
+   /**
+    * Get the activation spec
+    *
+    * @return The value
+    */
+   public ActiveMQActivationSpec getActivationSpec()
+   {
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getActivationSpec()");
+      }
+
+      return spec;
+   }
+
+   /**
+    * Get the message endpoint factory
+    *
+    * @return The value
+    */
+   public MessageEndpointFactory getMessageEndpointFactory()
+   {
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMessageEndpointFactory()");
+      }
+
+      return endpointFactory;
+   }
+
+   /**
+    * Get whether delivery is transacted
+    *
+    * @return The value
+    */
+   public boolean isDeliveryTransacted()
+   {
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("isDeliveryTransacted()");
+      }
+
+      return isDeliveryTransacted;
+   }
+
+   /**
+    * Get the work manager
+    *
+    * @return The value
+    */
+   public WorkManager getWorkManager()
+   {
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getWorkManager()");
+      }
+
+      return ra.getWorkManager();
+   }
+
+   /**
+    * Is the destination a topic
+    *
+    * @return The value
+    */
+   public boolean isTopic()
+   {
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("isTopic()");
+      }
+
+      return isTopic;
+   }
+
+   /**
+    * Start the activation
+    *
+    * @throws ResourceException Thrown if an error occurs
+    */
+   public void start() throws ResourceException
+   {
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("start()");
+      }
+      deliveryActive.set(true);
+      ra.getWorkManager().scheduleWork(new SetupActivation());
+   }
+
+   /**
+    * @return the topicTemporaryQueue
+    */
+   public SimpleString getTopicTemporaryQueue()
+   {
+      return topicTemporaryQueue;
+   }
+
+   /**
+    * @param topicTemporaryQueue the topicTemporaryQueue to set
+    */
+   public void setTopicTemporaryQueue(SimpleString topicTemporaryQueue)
+   {
+      this.topicTemporaryQueue = topicTemporaryQueue;
+   }
+
+   /**
+    * @return the list of XAResources for this activation endpoint
+    */
+   public List<XAResource> getXAResources()
+   {
+      List<XAResource> xaresources = new ArrayList<XAResource>();
+      for (ActiveMQMessageHandler handler : handlers)
+      {
+         XAResource xares = handler.getXAResource();
+         if (xares != null)
+         {
+            xaresources.add(xares);
+         }
+      }
+      return xaresources;
+   }
+
+   /**
+    * Stop the activation
+    */
+   public void stop()
+   {
+      if (ActiveMQActivation.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("stop()");
+      }
+
+      deliveryActive.set(false);
+      teardown();
+   }
+
+   /**
+    * Setup the activation
+    *
+    * @throws Exception Thrown if an error occurs
+    */
+   protected synchronized void setup() throws Exception
+   {
+      ActiveMQRALogger.LOGGER.debug("Setting up " + spec);
+
+      setupCF();
+
+      setupDestination();
+
+      Exception firstException = null;
+
+      for (int i = 0; i < spec.getMaxSession(); i++)
+      {
+         ClientSessionFactory cf = null;
+         ClientSession session = null;
+
+         try
+         {
+            cf = factory.getServerLocator().createSessionFactory();
+            session = setupSession(cf);
+            ActiveMQMessageHandler handler = new ActiveMQMessageHandler(this, 
ra.getTM(), (ClientSessionInternal) session, cf, i);
+            handler.setup();
+            handlers.add(handler);
+         }
+         catch (Exception e)
+         {
+            if (cf != null)
+            {
+               cf.close();
+            }
+            if (session != null)
+            {
+               session.close();
+            }
+            if (firstException == null)
+            {
+               firstException = e;
+            }
+         }
+      }
+      //if we have any exceptions close all the handlers and throw the first 
exception.
+      //we don't want partially configured activations, i.e. only 8 out of 15 
sessions started so best to stop and log the error.
+      if (firstException != null)
+      {
+         for (ActiveMQMessageHandler handler : handlers)
+         {
+            handler.teardown();
+         }
+         throw firstException;
+      }
+
+      //now start them all together.
+      for (ActiveMQMessageHandler handler : handlers)
+      {
+         handler.start();
+      }
+
+      resourceRecovery = ra.getRecoveryManager().register(factory, 
spec.getUser(), spec.getPassword());
+
+      ActiveMQRALogger.LOGGER.debug("Setup complete " + this);
+   }
+
+   /**
+    * Teardown the activation
+    */
+   protected synchronized void teardown()
+   {
+      ActiveMQRALogger.LOGGER.debug("Tearing down " + spec);
+
+      if (resourceRecovery != null)
+      {
+         ra.getRecoveryManager().unRegister(resourceRecovery);
+      }
+
+      final ActiveMQMessageHandler[] handlersCopy = new 
ActiveMQMessageHandler[handlers.size()];
+
+      // We need to do from last to first as any temporary queue will have 
been created on the first handler
+      // So we invert the handlers here
+      for (int i = 0; i < handlers.size(); i++)
+      {
+         // The index here is the complimentary so it's inverting the array
+         handlersCopy[i] = handlers.get(handlers.size() - i - 1);
+      }
+
+      handlers.clear();
+
+      FutureLatch future = new FutureLatch(handlersCopy.length);
+      List<Thread> interruptThreads = new ArrayList<>();
+      for (ActiveMQMessageHandler handler : handlersCopy)
+      {
+         Thread thread = handler.interruptConsumer(future);
+         if (thread != null)
+         {
+            interruptThreads.add(thread);
+         }
+      }
+
+      //wait for all the consumers to complete any onmessage calls
+      boolean stuckThreads = !future.await(factory.getCallTimeout());
+      //if any are stuck then we need to interrupt them
+      if (stuckThreads)
+      {
+         for (Thread interruptThread : interruptThreads)
+         {
+            try
+            {
+               interruptThread.interrupt();
+            }
+            catch (Exception e)
+            {
+               //ok
+            }
+         }
+      }
+
+      Thread threadTearDown = new Thread("TearDown/ActiveMQActivation")
+      {
+         public void run()
+         {
+            for (ActiveMQMessageHandler handler : handlersCopy)
+            {
+               handler.teardown();
+            }
+         }
+      };
+
+      // We will first start a new thread that will call tearDown on all the 
instances, trying to graciously shutdown everything.
+      // We will then use the call-timeout to determine a timeout.
+      // if that failed we will then close the connection factory, and 
interrupt the thread
+      threadTearDown.start();
+
+      try
+      {
+         threadTearDown.join(factory.getCallTimeout());
+      }
+      catch (InterruptedException e)
+      {
+         // nothing to be done on this context.. we will just keep going as we 
need to send an interrupt to threadTearDown and give up
+      }
+
+      if (threadTearDown.isAlive())
+      {
+         if (factory != null)
+         {
+            // This will interrupt any threads waiting on reconnect
+            factory.close();
+            factory = null;
+         }
+         threadTearDown.interrupt();
+
+         try
+         {
+            threadTearDown.join(5000);
+         }
+         catch (InterruptedException e)
+         {
+            // nothing to be done here.. we are going down anyways
+         }
+
+         if (threadTearDown.isAlive())
+         {
+            ActiveMQRALogger.LOGGER.warn("Thread " + threadTearDown + " 
couldn't be finished");
+         }
+      }
+
+      if (spec.isHasBeenUpdated() && factory != null)
+      {
+         factory.close();
+         factory = null;
+      }
+
+
+      ActiveMQRALogger.LOGGER.debug("Tearing down complete " + this);
+   }
+
+   protected void setupCF() throws Exception
+   {
+      if (spec.getConnectionFactoryLookup() != null)
+      {
+         Context ctx;
+         if (spec.getParsedJndiParams() == null)
+         {
+            ctx = new InitialContext();
+         }
+         else
+         {
+            ctx = new InitialContext(spec.getParsedJndiParams());
+         }
+         Object fac = ctx.lookup(spec.getConnectionFactoryLookup());
+         if (fac instanceof ActiveMQConnectionFactory)
+         {
+            factory = (ActiveMQConnectionFactory) fac;
+         }
+         else
+         {
+            ActiveMQRAConnectionFactory raFact = (ActiveMQRAConnectionFactory) 
fac;
+            if (spec.isHasBeenUpdated())
+            {
+               factory = 
raFact.getResourceAdapter().createActiveMQConnectionFactory(spec);
+            }
+            else
+            {
+               factory = raFact.getDefaultFactory();
+               if (factory != ra.getDefaultActiveMQConnectionFactory())
+               {
+                  ActiveMQRALogger.LOGGER.warnDifferentConnectionfactory();
+               }
+            }
+         }
+      }
+      else if (spec.isHasBeenUpdated())
+      {
+         factory = ra.createActiveMQConnectionFactory(spec);
+      }
+      else
+      {
+         factory = ra.getDefaultActiveMQConnectionFactory();
+      }
+   }
+
+   /**
+    * Setup a session
+    *
+    * @param cf
+    * @return The connection
+    * @throws Exception Thrown if an error occurs
+    */
+   protected ClientSession setupSession(ClientSessionFactory cf) throws 
Exception
+   {
+      ClientSession result = null;
+
+      try
+      {
+         result = ra.createSession(cf,
+                                   spec.getAcknowledgeModeInt(),
+                                   spec.getUser(),
+                                   spec.getPassword(),
+                                   ra.getPreAcknowledge(),
+                                   ra.getDupsOKBatchSize(),
+                                   ra.getTransactionBatchSize(),
+                                   isDeliveryTransacted,
+                                   spec.isUseLocalTx(),
+                                   spec.getTransactionTimeout());
+
+         result.addMetaData("resource-adapter", "inbound");
+         result.addMetaData("jms-session", "");
+         String clientID = ra.getClientID() == null ? spec.getClientID() : 
ra.getClientID();
+         if (clientID != null)
+         {
+            result.addMetaData("jms-client-id", clientID);
+         }
+
+         ActiveMQRALogger.LOGGER.debug("Using queue connection " + result);
+
+         return result;
+      }
+      catch (Throwable t)
+      {
+         try
+         {
+            if (result != null)
+            {
+               result.close();
+            }
+         }
+         catch (Exception e)
+         {
+            ActiveMQRALogger.LOGGER.trace("Ignored error closing connection", 
e);
+         }
+         if (t instanceof Exception)
+         {
+            throw (Exception) t;
+         }
+         throw new RuntimeException("Error configuring connection", t);
+      }
+   }
+
+   public SimpleString getAddress()
+   {
+      return destination.getSimpleAddress();
+   }
+
+   protected void setupDestination() throws Exception
+   {
+
+      String destinationName = spec.getDestination();
+
+      if (spec.isUseJNDI())
+      {
+         Context ctx;
+         if (spec.getParsedJndiParams() == null)
+         {
+            ctx = new InitialContext();
+         }
+         else
+         {
+            ctx = new InitialContext(spec.getParsedJndiParams());
+         }
+         ActiveMQRALogger.LOGGER.debug("Using context " + ctx.getEnvironment() 
+ " for " + spec);
+         if (ActiveMQActivation.trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("setupDestination(" + ctx + ")");
+         }
+
+         String destinationTypeString = spec.getDestinationType();
+         if (destinationTypeString != null && 
!destinationTypeString.trim().equals(""))
+         {
+            ActiveMQRALogger.LOGGER.debug("Destination type defined as " + 
destinationTypeString);
+
+            Class<?> destinationType;
+            if (Topic.class.getName().equals(destinationTypeString))
+            {
+               destinationType = Topic.class;
+               isTopic = true;
+            }
+            else
+            {
+               destinationType = Queue.class;
+            }
+
+            ActiveMQRALogger.LOGGER.debug("Retrieving " + 
destinationType.getName() + " \"" + destinationName + "\" from JNDI");
+
+            try
+            {
+               destination = (ActiveMQDestination) ActiveMQRaUtils.lookup(ctx, 
destinationName, destinationType);
+            }
+            catch (Exception e)
+            {
+               if (destinationName == null)
+               {
+                  throw ActiveMQRABundle.BUNDLE.noDestinationName();
+               }
+
+               String calculatedDestinationName = 
destinationName.substring(destinationName.lastIndexOf('/') + 1);
+
+               ActiveMQRALogger.LOGGER.debug("Unable to retrieve " + 
destinationName +
+                                               " from JNDI. Creating a new " + 
destinationType.getName() +
+                                               " named " + 
calculatedDestinationName + " to be used by the MDB.");
+
+               // If there is no binding on naming, we will just create a new 
instance
+               if (isTopic)
+               {
+                  destination = (ActiveMQDestination) 
ActiveMQJMSClient.createTopic(calculatedDestinationName);
+               }
+               else
+               {
+                  destination = (ActiveMQDestination) 
ActiveMQJMSClient.createQueue(calculatedDestinationName);
+               }
+            }
+         }
+         else
+         {
+            ActiveMQRALogger.LOGGER.debug("Destination type not defined in MDB 
activation configuration.");
+            ActiveMQRALogger.LOGGER.debug("Retrieving " + 
Destination.class.getName() + " \"" + destinationName + "\" from JNDI");
+
+            destination = (ActiveMQDestination) ActiveMQRaUtils.lookup(ctx, 
destinationName, Destination.class);
+            if (destination instanceof Topic)
+            {
+               isTopic = true;
+            }
+         }
+      }
+      else
+      {
+         
ActiveMQRALogger.LOGGER.instantiatingDestination(spec.getDestinationType(), 
spec.getDestination());
+
+         if (Topic.class.getName().equals(spec.getDestinationType()))
+         {
+            destination = (ActiveMQDestination) 
ActiveMQJMSClient.createTopic(spec.getDestination());
+            isTopic = true;
+         }
+         else
+         {
+            destination = (ActiveMQDestination) 
ActiveMQJMSClient.createQueue(spec.getDestination());
+         }
+      }
+   }
+
+   /**
+    * Get a string representation
+    *
+    * @return The value
+    */
+   @Override
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(ActiveMQActivation.class.getName()).append('(');
+      buffer.append("spec=").append(spec.getClass().getName());
+      buffer.append(" mepf=").append(endpointFactory.getClass().getName());
+      buffer.append(" active=").append(deliveryActive.get());
+      if (spec.getDestination() != null)
+      {
+         buffer.append(" destination=").append(spec.getDestination());
+      }
+      buffer.append(" transacted=").append(isDeliveryTransacted);
+      buffer.append(')');
+      return buffer.toString();
+   }
+
+   /**
+    * Handles any failure by trying to reconnect
+    *
+    * @param failure the reason for the failure
+    */
+   public void handleFailure(Throwable failure)
+   {
+      if (failure instanceof ActiveMQException && ((ActiveMQException) 
failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST)
+      {
+         
ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
+      }
+      else if (failure instanceof ActiveMQException && ((ActiveMQException) 
failure).getType() == ActiveMQExceptionType.NOT_CONNECTED)
+      {
+         ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
+      }
+      else
+      {
+         ActiveMQRALogger.LOGGER.failureInActivation(failure, spec);
+      }
+      int reconnectCount = 0;
+      int setupAttempts = spec.getSetupAttempts();
+      long setupInterval = spec.getSetupInterval();
+
+      // Only enter the failure loop once
+      if (inFailure.getAndSet(true))
+         return;
+      try
+      {
+         Throwable lastException = failure;
+         while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount 
< setupAttempts))
+         {
+            teardown();
+
+            try
+            {
+               Thread.sleep(setupInterval);
+            }
+            catch (InterruptedException e)
+            {
+               ActiveMQRALogger.LOGGER.debug("Interrupted trying to reconnect 
" + spec, e);
+               break;
+            }
+
+            if (reconnectCount < 1)
+            {
+               ActiveMQRALogger.LOGGER.attemptingReconnect(spec);
+            }
+            try
+            {
+               setup();
+               ActiveMQRALogger.LOGGER.reconnected();
+               break;
+            }
+            catch (Throwable t)
+            {
+               if (failure instanceof ActiveMQException && 
((ActiveMQException) failure).getType() == 
ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST)
+               {
+                  if (lastException == null || !(t instanceof 
ActiveMQNonExistentQueueException))
+                  {
+                     lastException = t;
+                     
ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
+                  }
+               }
+               else if (failure instanceof ActiveMQException && 
((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED)
+               {
+                  if (lastException == null || !(t instanceof 
ActiveMQNotConnectedException))
+                  {
+                     lastException = t;
+                     ActiveMQRALogger.LOGGER.awaitingJMSServerCreation();
+                  }
+               }
+               else
+               {
+                  ActiveMQRALogger.LOGGER.errorReconnecting(t, spec);
+               }
+            }
+            ++reconnectCount;
+         }
+      }
+      finally
+      {
+         // Leaving failure recovery loop
+         inFailure.set(false);
+      }
+   }
+
+   public ActiveMQConnectionFactory getConnectionFactory()
+   {
+      return this.factory;
+   }
+
+   /**
+    * Handles the setup
+    */
+   private class SetupActivation implements Work
+   {
+      public void run()
+      {
+         try
+         {
+            setup();
+         }
+         catch (Throwable t)
+         {
+            handleFailure(t);
+         }
+      }
+
+      public void release()
+      {
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java
new file mode 100644
index 0000000..06f9c7a
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivationSpec.java
@@ -0,0 +1,945 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra.inflow;
+
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
+import javax.resource.spi.InvalidPropertyException;
+import javax.resource.spi.ResourceAdapter;
+import java.beans.IntrospectionException;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.activemq.ra.ConnectionFactoryProperties;
+import org.apache.activemq.ra.ActiveMQRALogger;
+import org.apache.activemq.ra.ActiveMQRaUtils;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+
+/**
+ * The activation spec
+ * These properties are set on the MDB ActivactionProperties
+ *
+ * @author <a href="adr...@jboss.com">Adrian Brock</a>
+ * @author <a href="jesper.peder...@jboss.org">Jesper Pedersen</a>
+ * @author <a href="mailto:andy.tay...@jboss.org";>Andy Taylor</a>
+ * @author <a href="mailto:clebert.suco...@jboss.org";>Clebert Suconic</a>
+ */
+public class ActiveMQActivationSpec extends ConnectionFactoryProperties 
implements ActivationSpec, Serializable
+{
+   private static final long serialVersionUID = -7997041053897964654L;
+
+   private static final int DEFAULT_MAX_SESSION = 15;
+
+   /**
+    * Whether trace is enabled
+    */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+
+   public String strConnectorClassName;
+
+   public String strConnectionParameters;
+
+   /**
+    * The resource adapter
+    */
+   private ActiveMQResourceAdapter ra;
+
+   /**
+    * The connection factory lookup
+    */
+   private String connectionFactoryLookup;
+
+   /**
+    * The destination
+    */
+   private String destination;
+
+   /**
+    * The destination type
+    */
+   private String destinationType;
+
+   /**
+    * The message selector
+    */
+   private String messageSelector;
+
+   /**
+    * The acknowledgement mode
+    */
+   private int acknowledgeMode;
+
+   /**
+    * The subscription durability
+    */
+   private boolean subscriptionDurability;
+
+   /**
+    * The subscription name
+    */
+   private String subscriptionName;
+
+   /**
+    * If this is true, a durable subscription could be shared by multiple MDB 
instances
+    */
+   private boolean shareSubscriptions;
+
+   /**
+    * The user
+    */
+   private String user;
+
+   /**
+    * The password
+    */
+   private String password;
+
+   /**
+    * The maximum number of sessions
+    */
+   private Integer maxSession;
+
+   /**
+    * Transaction timeout
+    */
+   private Integer transactionTimeout;
+
+   private Boolean useJNDI = true;
+
+   private String jndiParams = null;
+
+   private Hashtable parsedJndiParams;
+
+   /* use local tx instead of XA*/
+   private Boolean localTx;
+
+   // undefined by default, default is specified at the RA level in 
ActiveMQRAProperties
+   private Integer setupAttempts;
+
+   // undefined by default, default is specified at the RA level in 
ActiveMQRAProperties
+   private Long setupInterval;
+
+   /**
+    * Constructor
+    */
+   public ActiveMQActivationSpec()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("constructor()");
+      }
+
+      ra = null;
+      destination = null;
+      destinationType = null;
+      messageSelector = null;
+      acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+      subscriptionDurability = false;
+      subscriptionName = null;
+      user = null;
+      password = null;
+      maxSession = DEFAULT_MAX_SESSION;
+      transactionTimeout = 0;
+   }
+
+   /**
+    * Get the resource adapter
+    *
+    * @return The resource adapter
+    */
+   public ResourceAdapter getResourceAdapter()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getResourceAdapter()");
+      }
+
+      return ra;
+   }
+
+   /**
+    * @return the useJNDI
+    */
+   public boolean isUseJNDI()
+   {
+      if (useJNDI == null)
+      {
+         return ra.isUseJNDI();
+      }
+      return useJNDI;
+   }
+
+   /**
+    * @param value the useJNDI to set
+    */
+   public void setUseJNDI(final boolean value)
+   {
+      useJNDI = value;
+   }
+
+   /**
+    * @return return the jndi params to use
+    */
+   public String getJndiParams()
+   {
+      if (jndiParams == null)
+      {
+         return ra.getJndiParams();
+      }
+      return jndiParams;
+   }
+
+   public void setJndiParams(String jndiParams)
+   {
+      this.jndiParams = jndiParams;
+      parsedJndiParams = ActiveMQRaUtils.parseHashtableConfig(jndiParams);
+   }
+
+   public Hashtable<?, ?> getParsedJndiParams()
+   {
+      if (parsedJndiParams == null)
+      {
+         return ra.getParsedJndiParams();
+      }
+      return parsedJndiParams;
+   }
+
+   /**
+    * Set the resource adapter
+    *
+    * @param ra The resource adapter
+    * @throws ResourceException Thrown if incorrect resource adapter
+    */
+   public void setResourceAdapter(final ResourceAdapter ra) throws 
ResourceException
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setResourceAdapter(" + ra + ")");
+      }
+
+      if (ra == null || !(ra instanceof ActiveMQResourceAdapter))
+      {
+         throw new ResourceException("Resource adapter is " + ra);
+      }
+
+      this.ra = (ActiveMQResourceAdapter) ra;
+   }
+
+   /**
+    * Get the connection factory lookup
+    *
+    * @return The value
+    */
+   public String getConnectionFactoryLookup()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getConnectionFactoryLookup() ->" + 
connectionFactoryLookup);
+      }
+
+      return connectionFactoryLookup;
+   }
+
+   /**
+    * Set the connection factory lookup
+    *
+    * @param value The value
+    */
+   public void setConnectionFactoryLookup(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setConnectionFactoryLookup(" + value + 
")");
+      }
+
+      connectionFactoryLookup = value;
+   }
+
+   /**
+    * Get the destination
+    *
+    * @return The value
+    */
+   public String getDestination()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getDestination()");
+      }
+
+      return destination;
+   }
+
+   /**
+    * Set the destination
+    *
+    * @param value The value
+    */
+   public void setDestination(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setDestination(" + value + ")");
+      }
+
+      destination = value;
+   }
+
+   /**
+    * Get the destination lookup
+    *
+    * @return The value
+    */
+   public String getDestinationLookup()
+   {
+      return getDestination();
+   }
+
+   /**
+    * Set the destination
+    *
+    * @param value The value
+    */
+   public void setDestinationLookup(final String value)
+   {
+      setDestination(value);
+      setUseJNDI(true);
+   }
+
+   /**
+    * Get the destination type
+    *
+    * @return The value
+    */
+   public String getDestinationType()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getDestinationType()");
+      }
+
+      return destinationType;
+   }
+
+   /**
+    * Set the destination type
+    *
+    * @param value The value
+    */
+   public void setDestinationType(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setDestinationType(" + value + ")");
+      }
+
+      destinationType = value;
+   }
+
+   /**
+    * Get the message selector
+    *
+    * @return The value
+    */
+   public String getMessageSelector()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMessageSelector()");
+      }
+
+      return messageSelector;
+   }
+
+   /**
+    * Set the message selector
+    *
+    * @param value The value
+    */
+   public void setMessageSelector(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setMessageSelector(" + value + ")");
+      }
+
+      messageSelector = value;
+   }
+
+   /**
+    * Get the acknowledge mode
+    *
+    * @return The value
+    */
+   public String getAcknowledgeMode()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getAcknowledgeMode()");
+      }
+
+      if (Session.DUPS_OK_ACKNOWLEDGE == acknowledgeMode)
+      {
+         return "Dups-ok-acknowledge";
+      }
+      else
+      {
+         return "Auto-acknowledge";
+      }
+   }
+
+   /**
+    * Set the acknowledge mode
+    *
+    * @param value The value
+    */
+   public void setAcknowledgeMode(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setAcknowledgeMode(" + value + ")");
+      }
+
+      if ("DUPS_OK_ACKNOWLEDGE".equalsIgnoreCase(value) || 
"Dups-ok-acknowledge".equalsIgnoreCase(value))
+      {
+         acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
+      }
+      else if ("AUTO_ACKNOWLEDGE".equalsIgnoreCase(value) || 
"Auto-acknowledge".equalsIgnoreCase(value))
+      {
+         acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+      }
+      else
+      {
+         throw new IllegalArgumentException("Unsupported acknowledgement mode 
" + value);
+      }
+   }
+
+   /**
+    * @return the acknowledgement mode
+    */
+   public int getAcknowledgeModeInt()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getAcknowledgeMode()");
+      }
+
+      return acknowledgeMode;
+   }
+
+   /**
+    * Get the subscription durability
+    *
+    * @return The value
+    */
+   public String getSubscriptionDurability()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getSubscriptionDurability()");
+      }
+
+      if (subscriptionDurability)
+      {
+         return "Durable";
+      }
+      else
+      {
+         return "NonDurable";
+      }
+   }
+
+   /**
+    * Set the subscription durability
+    *
+    * @param value The value
+    */
+   public void setSubscriptionDurability(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setSubscriptionDurability(" + value + 
")");
+      }
+
+      subscriptionDurability = "Durable".equals(value);
+   }
+
+   /**
+    * Get the status of subscription durability
+    *
+    * @return The value
+    */
+   public boolean isSubscriptionDurable()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("isSubscriptionDurable()");
+      }
+
+      return subscriptionDurability;
+   }
+
+   /**
+    * Get the subscription name
+    *
+    * @return The value
+    */
+   public String getSubscriptionName()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getSubscriptionName()");
+      }
+
+      return subscriptionName;
+   }
+
+   /**
+    * Set the subscription name
+    *
+    * @param value The value
+    */
+   public void setSubscriptionName(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setSubscriptionName(" + value + ")");
+      }
+
+      subscriptionName = value;
+   }
+
+
+   /**
+    * @return the shareDurableSubscriptions
+    */
+   public boolean isShareSubscriptions()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("isShareSubscriptions() = " + 
shareSubscriptions);
+      }
+
+      return shareSubscriptions;
+   }
+
+   /**
+    * @param shareSubscriptions the shareDurableSubscriptions to set
+    */
+   public void setShareSubscriptions(boolean shareSubscriptions)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setShareSubscriptions(" + 
shareSubscriptions + ")");
+      }
+
+      this.shareSubscriptions = shareSubscriptions;
+   }
+
+   /**
+    * Get the user
+    *
+    * @return The value
+    */
+   public String getUser()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getUser()");
+      }
+
+      if (user == null)
+      {
+         return ra.getUserName();
+      }
+      else
+      {
+         return user;
+      }
+   }
+
+   /**
+    * Set the user
+    *
+    * @param value The value
+    */
+   public void setUser(final String value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setUser(" + value + ")");
+      }
+
+      user = value;
+   }
+
+   /**
+    * Get the password
+    *
+    * @return The value
+    */
+   public String getPassword()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getPassword()");
+      }
+
+      if (password == null)
+      {
+         return ra.getPassword();
+      }
+      else
+      {
+         return password;
+      }
+   }
+
+   public String getOwnPassword()
+   {
+      return password;
+   }
+
+   /**
+    * Set the password
+    *
+    * @param value The value
+    */
+   public void setPassword(final String value) throws Exception
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setPassword(****)");
+      }
+
+      password = value;
+   }
+
+   /**
+    * Get the number of max session
+    *
+    * @return The value
+    */
+   public Integer getMaxSession()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getMaxSession()");
+      }
+
+      if (maxSession == null)
+      {
+         return DEFAULT_MAX_SESSION;
+      }
+
+      return maxSession;
+   }
+
+   /**
+    * Set the number of max session
+    *
+    * @param value The value
+    */
+   public void setMaxSession(final Integer value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setMaxSession(" + value + ")");
+      }
+
+      maxSession = value;
+   }
+
+   /**
+    * Get the transaction timeout
+    *
+    * @return The value
+    */
+   public Integer getTransactionTimeout()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getTransactionTimeout()");
+      }
+
+      return transactionTimeout;
+   }
+
+   /**
+    * Set the transaction timeout
+    *
+    * @param value The value
+    */
+   public void setTransactionTimeout(final Integer value)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setTransactionTimeout(" + value + ")");
+      }
+
+      transactionTimeout = value;
+   }
+
+   public Boolean isUseLocalTx()
+   {
+      if (localTx == null)
+      {
+         return ra.getUseLocalTx();
+      }
+      else
+      {
+         return localTx;
+      }
+   }
+
+   public void setUseLocalTx(final Boolean localTx)
+   {
+      this.localTx = localTx;
+   }
+
+   public int getSetupAttempts()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getSetupAttempts()");
+      }
+
+      if (setupAttempts == null)
+      {
+         return ra.getSetupAttempts();
+      }
+      else
+      {
+         return setupAttempts;
+      }
+   }
+
+   public void setSetupAttempts(int setupAttempts)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setSetupAttempts(" + setupAttempts + 
")");
+      }
+
+      this.setupAttempts = setupAttempts;
+   }
+
+   public long getSetupInterval()
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("getSetupInterval()");
+      }
+
+      if (setupInterval == null)
+      {
+         return ra.getSetupInterval();
+      }
+      else
+      {
+         return setupInterval;
+      }
+   }
+
+   public void setSetupInterval(long setupInterval)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setSetupInterval(" + setupInterval + 
")");
+      }
+
+      this.setupInterval = setupInterval;
+   }
+
+   /**
+    * Validate
+    *
+    * @throws InvalidPropertyException Thrown if a validation exception occurs
+    */
+   public void validate() throws InvalidPropertyException
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("validate()");
+      }
+
+      List<String> errorMessages = new ArrayList<String>();
+      List<PropertyDescriptor> propsNotSet = new 
ArrayList<PropertyDescriptor>();
+
+      try
+      {
+         if (destination == null || destination.trim().equals(""))
+         {
+            propsNotSet.add(new PropertyDescriptor("destination", 
ActiveMQActivationSpec.class));
+            errorMessages.add("Destination is mandatory.");
+         }
+
+         if (destinationType != null && 
!Topic.class.getName().equals(destinationType) && 
!Queue.class.getName().equals(destinationType))
+         {
+            propsNotSet.add(new PropertyDescriptor("destinationType", 
ActiveMQActivationSpec.class));
+            errorMessages.add("If set, the destinationType must be either 
'javax.jms.Topic' or 'javax.jms.Queue'.");
+         }
+
+         if ((destinationType == null || destinationType.length() == 0 || 
Topic.class.getName().equals(destinationType)) && isSubscriptionDurable() && 
(subscriptionName == null || subscriptionName.length() == 0))
+         {
+            propsNotSet.add(new PropertyDescriptor("subscriptionName", 
ActiveMQActivationSpec.class));
+            errorMessages.add("If subscription is durable then subscription 
name must be specified.");
+         }
+      }
+      catch (IntrospectionException e)
+      {
+         e.printStackTrace();
+      }
+
+      if (propsNotSet.size() > 0)
+      {
+         StringBuffer b = new StringBuffer();
+         b.append("Invalid settings:");
+         for (Iterator<String> iter = errorMessages.iterator(); 
iter.hasNext();)
+         {
+            b.append(" ");
+            b.append(iter.next());
+         }
+         InvalidPropertyException e = new 
InvalidPropertyException(b.toString());
+         final PropertyDescriptor[] descriptors = propsNotSet.toArray(new 
PropertyDescriptor[propsNotSet.size()]);
+         e.setInvalidPropertyDescriptors(descriptors);
+         throw e;
+      }
+   }
+
+   public String getConnectorClassName()
+   {
+      return strConnectorClassName;
+   }
+
+   public void setConnectorClassName(final String connectorClassName)
+   {
+      if (ActiveMQActivationSpec.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setConnectorClassName(" + 
connectorClassName + ")");
+      }
+
+      strConnectorClassName = connectorClassName;
+
+      
setParsedConnectorClassNames(ActiveMQRaUtils.parseConnectorConnectorConfig(connectorClassName));
+   }
+
+   /**
+    * @return the connectionParameters
+    */
+   public String getConnectionParameters()
+   {
+      return strConnectionParameters;
+   }
+
+   public void setConnectionParameters(final String configuration)
+   {
+      strConnectionParameters = configuration;
+      
setParsedConnectionParameters(ActiveMQRaUtils.parseConfig(configuration));
+   }
+
+   /**
+    * Get a string representation
+    *
+    * @return The value
+    */
+   @Override
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(ActiveMQActivationSpec.class.getName()).append('(');
+      buffer.append("ra=").append(ra);
+      if (messageSelector != null)
+      {
+         buffer.append(" 
connectionFactoryLookup=").append(connectionFactoryLookup);
+      }
+      buffer.append(" destination=").append(destination);
+      buffer.append(" destinationType=").append(destinationType);
+      if (messageSelector != null)
+      {
+         buffer.append(" selector=").append(messageSelector);
+      }
+      buffer.append(" ack=").append(getAcknowledgeMode());
+      buffer.append(" durable=").append(subscriptionDurability);
+      buffer.append(" clientID=").append(getClientID());
+      if (subscriptionName != null)
+      {
+         buffer.append(" subscription=").append(subscriptionName);
+      }
+      buffer.append(" user=").append(user);
+      if (password != null)
+      {
+         buffer.append(" password=").append("****");
+      }
+      buffer.append(" maxSession=").append(maxSession);
+      buffer.append(')');
+      return buffer.toString();
+   }
+
+   // here for backwards compatibilty
+   public void setUseDLQ(final boolean b)
+   {
+   }
+
+   public void setDLQJNDIName(final String name)
+   {
+   }
+
+   public void setDLQHandler(final String handler)
+   {
+   }
+
+   public void setDLQMaxResent(final int maxResent)
+   {
+   }
+
+   public void setProviderAdapterJNDI(final String jndi)
+   {
+   }
+
+   /**
+    * @param keepAlive the keepAlive to set
+    */
+   public void setKeepAlive(boolean keepAlive)
+   {
+   }
+
+   /**
+    * @param keepAliveMillis the keepAliveMillis to set
+    */
+   public void setKeepAliveMillis(long keepAliveMillis)
+   {
+   }
+
+
+   public void setReconnectInterval(long interval)
+   {
+   }
+
+   public void setMinSession(final Integer value)
+   {
+   }
+
+   public void setMaxMessages(final Integer value)
+   {
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java
new file mode 100644
index 0000000..abd68d0
--- /dev/null
+++ 
b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQMessageHandler.java
@@ -0,0 +1,430 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.ra.inflow;
+
+import javax.jms.MessageListener;
+import javax.resource.ResourceException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+import java.util.UUID;
+
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientSession.QueueQuery;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.MessageHandler;
+import org.apache.activemq.core.client.impl.ClientConsumerInternal;
+import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.jms.client.ActiveMQDestination;
+import org.apache.activemq.jms.client.ActiveMQMessage;
+import org.apache.activemq.ra.ActiveMQRALogger;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.ra.ActiveMQXAResourceWrapper;
+import org.apache.activemq.utils.FutureLatch;
+
+/**
+ * The message handler
+ *
+ * @author <a href="adr...@jboss.com">Adrian Brock</a>
+ * @author <a href="mailto:jesper.peder...@jboss.org";>Jesper Pedersen</a>
+ * @author <a href="mailto:andy.tay...@jboss.org";>Andy Taylor</a>
+ * @author <a href="mailto:mtay...@redhat.com";>Martyn Taylor</a>
+ */
+public class ActiveMQMessageHandler implements MessageHandler
+{
+   /**
+    * Trace enabled
+    */
+   private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
+   /**
+    * The session
+    */
+   private final ClientSessionInternal session;
+
+   private ClientConsumerInternal consumer;
+
+   /**
+    * The endpoint
+    */
+   private MessageEndpoint endpoint;
+
+   private final ActiveMQActivation activation;
+
+   private boolean useLocalTx;
+
+   private boolean transacted;
+
+   private boolean useXA = false;
+
+   private final int sessionNr;
+
+   private final TransactionManager tm;
+
+   private ClientSessionFactory cf;
+
+   public ActiveMQMessageHandler(final ActiveMQActivation activation,
+                                 final TransactionManager tm,
+                                 final ClientSessionInternal session,
+                                 final ClientSessionFactory cf,
+                                 final int sessionNr)
+   {
+      this.activation = activation;
+      this.session = session;
+      this.cf = cf;
+      this.sessionNr = sessionNr;
+      this.tm = tm;
+   }
+
+   public void setup() throws Exception
+   {
+      if (ActiveMQMessageHandler.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("setup()");
+      }
+
+      ActiveMQActivationSpec spec = activation.getActivationSpec();
+      String selector = spec.getMessageSelector();
+
+      // Create the message consumer
+      SimpleString selectorString = selector == null || 
selector.trim().equals("") ? null : new SimpleString(selector);
+      if (activation.isTopic() && spec.isSubscriptionDurable())
+      {
+         SimpleString queueName = new 
SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true,
+                                                                               
                              spec.getClientID(),
+                                                                               
                              spec.getSubscriptionName()));
+
+         QueueQuery subResponse = session.queueQuery(queueName);
+
+         if (!subResponse.isExists())
+         {
+            session.createQueue(activation.getAddress(), queueName, 
selectorString, true);
+         }
+         else
+         {
+            // The check for already exists should be done only at the first 
session
+            // As a deployed MDB could set up multiple instances in order to 
process messages in parallel.
+            if (sessionNr == 0 && subResponse.getConsumerCount() > 0)
+            {
+               if (!spec.isShareSubscriptions())
+               {
+                  throw new javax.jms.IllegalStateException("Cannot create a 
subscriber on the durable subscription since it already has subscriber(s)");
+               }
+               else if (ActiveMQRALogger.LOGGER.isDebugEnabled())
+               {
+                  ActiveMQRALogger.LOGGER.debug("the mdb on destination " + 
queueName + " already had " +
+                                                  
subResponse.getConsumerCount() +
+                                                  " consumers but the MDB is 
configured to share subscriptions, so no exceptions are thrown");
+               }
+            }
+
+            SimpleString oldFilterString = subResponse.getFilterString();
+
+            boolean selectorChanged = selector == null && oldFilterString != 
null ||
+               oldFilterString == null &&
+                  selector != null ||
+               (oldFilterString != null && selector != null && 
!oldFilterString.toString()
+                  .equals(selector));
+
+            SimpleString oldTopicName = subResponse.getAddress();
+
+            boolean topicChanged = 
!oldTopicName.equals(activation.getAddress());
+
+            if (selectorChanged || topicChanged)
+            {
+               // Delete the old durable sub
+               session.deleteQueue(queueName);
+
+               // Create the new one
+               session.createQueue(activation.getAddress(), queueName, 
selectorString, true);
+            }
+         }
+         consumer = (ClientConsumerInternal) session.createConsumer(queueName, 
null, false);
+      }
+      else
+      {
+         SimpleString tempQueueName;
+         if (activation.isTopic())
+         {
+            if (activation.getTopicTemporaryQueue() == null)
+            {
+               tempQueueName = new SimpleString(UUID.randomUUID().toString());
+               session.createTemporaryQueue(activation.getAddress(), 
tempQueueName, selectorString);
+               activation.setTopicTemporaryQueue(tempQueueName);
+            }
+            else
+            {
+               tempQueueName = activation.getTopicTemporaryQueue();
+               QueueQuery queueQuery = session.queueQuery(tempQueueName);
+               if (!queueQuery.isExists())
+               {
+                  // this is because we could be using remote servers (in 
cluster maybe)
+                  // and the queue wasn't created on that node yet.
+                  session.createTemporaryQueue(activation.getAddress(), 
tempQueueName, selectorString);
+               }
+            }
+         }
+         else
+         {
+            tempQueueName = activation.getAddress();
+         }
+         consumer = (ClientConsumerInternal) 
session.createConsumer(tempQueueName, selectorString);
+      }
+
+      // Create the endpoint, if we are transacted pass the session so it is 
enlisted, unless using Local TX
+      MessageEndpointFactory endpointFactory = 
activation.getMessageEndpointFactory();
+      useLocalTx = !activation.isDeliveryTransacted() && 
activation.getActivationSpec().isUseLocalTx();
+      transacted = activation.isDeliveryTransacted();
+      if (activation.isDeliveryTransacted() && 
!activation.getActivationSpec().isUseLocalTx())
+      {
+         XAResource xaResource = new ActiveMQXAResourceWrapper(session,
+                                                     
((ActiveMQResourceAdapter) spec.getResourceAdapter()).getJndiName(),
+                                                     
((ClientSessionFactoryInternal) cf).getLiveNodeId());
+         endpoint = endpointFactory.createEndpoint(xaResource);
+         useXA = true;
+      }
+      else
+      {
+         endpoint = endpointFactory.createEndpoint(null);
+         useXA = false;
+      }
+      consumer.setMessageHandler(this);
+   }
+
+   XAResource getXAResource()
+   {
+      return useXA ? session : null;
+   }
+
+   public Thread interruptConsumer(FutureLatch future)
+   {
+      try
+      {
+         if (consumer != null)
+         {
+            return consumer.prepareForClose(future);
+         }
+      }
+      catch (Throwable e)
+      {
+         ActiveMQRALogger.LOGGER.warn("Error interrupting handler on endpoint 
" + endpoint + " handler=" + consumer);
+      }
+      return null;
+   }
+
+   /**
+    * Stop the handler
+    */
+   public void teardown()
+   {
+      if (ActiveMQMessageHandler.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("teardown()");
+      }
+
+      try
+      {
+         if (endpoint != null)
+         {
+            endpoint.release();
+            endpoint = null;
+         }
+      }
+      catch (Throwable t)
+      {
+         ActiveMQRALogger.LOGGER.debug("Error releasing endpoint " + endpoint, 
t);
+      }
+
+      try
+      {
+         consumer.close();
+         if (activation.getTopicTemporaryQueue() != null)
+         {
+            // We need to delete temporary topics when the activation is 
stopped or messages will build up on the server
+            SimpleString tmpQueue = activation.getTopicTemporaryQueue();
+            QueueQuery subResponse = session.queueQuery(tmpQueue);
+            if (subResponse.getConsumerCount() == 0)
+            {
+               // This is optional really, since we now use temporaryQueues, 
we could simply ignore this
+               // and the server temporary queue would remove this as soon as 
the queue was removed
+               session.deleteQueue(tmpQueue);
+            }
+         }
+      }
+      catch (Throwable t)
+      {
+         ActiveMQRALogger.LOGGER.debug("Error closing core-queue consumer", t);
+      }
+
+      try
+      {
+         if (session != null)
+         {
+            session.close();
+         }
+      }
+      catch (Throwable t)
+      {
+         ActiveMQRALogger.LOGGER.debug("Error releasing session " + session, 
t);
+      }
+
+      try
+      {
+         if (cf != null)
+         {
+            cf.close();
+         }
+      }
+      catch (Throwable t)
+      {
+         ActiveMQRALogger.LOGGER.debug("Error releasing session factory " + 
session, t);
+      }
+   }
+
+   public void onMessage(final ClientMessage message)
+   {
+      if (ActiveMQMessageHandler.trace)
+      {
+         ActiveMQRALogger.LOGGER.trace("onMessage(" + message + ")");
+      }
+
+      ActiveMQMessage msg = ActiveMQMessage.createMessage(message, session);
+      boolean beforeDelivery = false;
+
+      try
+      {
+         if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm 
!= null)
+         {
+            
tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
+         }
+         endpoint.beforeDelivery(ActiveMQActivation.ONMESSAGE);
+         beforeDelivery = true;
+         msg.doBeforeReceive();
+
+         //In the transacted case the message must be acked *before* onMessage 
is called
+
+         if (transacted)
+         {
+            message.acknowledge();
+         }
+
+         ((MessageListener) endpoint).onMessage(msg);
+
+         if (!transacted)
+         {
+            message.acknowledge();
+         }
+
+         try
+         {
+            endpoint.afterDelivery();
+         }
+         catch (ResourceException e)
+         {
+            ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e);
+            return;
+         }
+         if (useLocalTx)
+         {
+            session.commit();
+         }
+
+         if (trace)
+         {
+            ActiveMQRALogger.LOGGER.trace("finished onMessage on " + message);
+         }
+      }
+      catch (Throwable e)
+      {
+         ActiveMQRALogger.LOGGER.errorDeliveringMessage(e);
+         // we need to call before/afterDelivery as a pair
+         if (beforeDelivery)
+         {
+            if (useXA && tm != null)
+            {
+               // This is the job for the container,
+               // however if the container throws an exception because of some 
other errors,
+               // there are situations where the container is not setting the 
rollback only
+               // this is to avoid a scenario where afterDelivery would kick in
+               try
+               {
+                  Transaction tx = tm.getTransaction();
+                  if (tx != null)
+                  {
+                     tx.setRollbackOnly();
+                  }
+               }
+               catch (Exception e1)
+               {
+                  ActiveMQRALogger.LOGGER.warn("unnable to clear the 
transaction", e1);
+                  try
+                  {
+                     session.rollback();
+                  }
+                  catch (ActiveMQException e2)
+                  {
+                     ActiveMQRALogger.LOGGER.warn("Unable to rollback", e2);
+                     return;
+                  }
+               }
+            }
+
+            MessageEndpoint endToUse = endpoint;
+            try
+            {
+               // to avoid a NPE that would happen while the RA is in tearDown
+               if (endToUse != null)
+               {
+                  endToUse.afterDelivery();
+               }
+            }
+            catch (ResourceException e1)
+            {
+               ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e1);
+            }
+         }
+         if (useLocalTx || !activation.isDeliveryTransacted())
+         {
+            try
+            {
+               session.rollback(true);
+            }
+            catch (ActiveMQException e1)
+            {
+               ActiveMQRALogger.LOGGER.unableToRollbackTX();
+            }
+         }
+      }
+      finally
+      {
+         try
+         {
+            session.resetIfNeeded();
+         }
+         catch (ActiveMQException e)
+         {
+            ActiveMQRALogger.LOGGER.unableToResetSession();
+         }
+      }
+
+   }
+
+   public void start() throws ActiveMQException
+   {
+      session.start();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java
----------------------------------------------------------------------
diff --git 
a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java
 
b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java
deleted file mode 100644
index 8bf12b6..0000000
--- 
a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/HornetQActivation.java
+++ /dev/null
@@ -1,831 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.apache.activemq.ra.inflow;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.resource.ResourceException;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkManager;
-import javax.transaction.xa.XAResource;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
-import org.apache.activemq.api.core.ActiveMQNotConnectedException;
-import org.apache.activemq.api.core.SimpleString;
-import org.apache.activemq.api.core.client.ClientSession;
-import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.api.jms.HornetQJMSClient;
-import org.apache.activemq.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.jms.client.HornetQConnectionFactory;
-import org.apache.activemq.jms.client.HornetQDestination;
-import org.apache.activemq.jms.server.recovery.XARecoveryConfig;
-import org.apache.activemq.ra.HornetQRABundle;
-import org.apache.activemq.ra.HornetQRAConnectionFactory;
-import org.apache.activemq.ra.HornetQRALogger;
-import org.apache.activemq.ra.HornetQRaUtils;
-import org.apache.activemq.ra.HornetQResourceAdapter;
-import org.apache.activemq.utils.FutureLatch;
-import org.apache.activemq.utils.SensitiveDataCodec;
-
-/**
- * The activation.
- *
- * @author <a href="adr...@jboss.com">Adrian Brock</a>
- * @author <a href="jesper.peder...@jboss.org">Jesper Pedersen</a>
- * @author <a href="mailto:andy.tay...@jboss.org";>Andy Taylor</a>
- */
-public class HornetQActivation
-{
-   /**
-    * Trace enabled
-    */
-   private static boolean trace = HornetQRALogger.LOGGER.isTraceEnabled();
-
-   /**
-    * The onMessage method
-    */
-   public static final Method ONMESSAGE;
-
-   /**
-    * The resource adapter
-    */
-   private final HornetQResourceAdapter ra;
-
-   /**
-    * The activation spec
-    */
-   private final HornetQActivationSpec spec;
-
-   /**
-    * The message endpoint factory
-    */
-   private final MessageEndpointFactory endpointFactory;
-
-   /**
-    * Whether delivery is active
-    */
-   private final AtomicBoolean deliveryActive = new AtomicBoolean(false);
-
-   /**
-    * The destination type
-    */
-   private boolean isTopic = false;
-
-   /**
-    * Is the delivery transacted
-    */
-   private boolean isDeliveryTransacted;
-
-   private HornetQDestination destination;
-
-   /**
-    * The name of the temporary subscription name that all the sessions will 
share
-    */
-   private SimpleString topicTemporaryQueue;
-
-   private final List<HornetQMessageHandler> handlers = new 
ArrayList<HornetQMessageHandler>();
-
-   private HornetQConnectionFactory factory;
-
-   // Whether we are in the failure recovery loop
-   private final AtomicBoolean inFailure = new AtomicBoolean(false);
-   private XARecoveryConfig resourceRecovery;
-
-   static
-   {
-      try
-      {
-         ONMESSAGE = MessageListener.class.getMethod("onMessage", new 
Class[]{Message.class});
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException(e);
-      }
-   }
-
-   /**
-    * Constructor
-    *
-    * @param ra              The resource adapter
-    * @param endpointFactory The endpoint factory
-    * @param spec            The activation spec
-    * @throws ResourceException Thrown if an error occurs
-    */
-   public HornetQActivation(final HornetQResourceAdapter ra,
-                            final MessageEndpointFactory endpointFactory,
-                            final HornetQActivationSpec spec) throws 
ResourceException
-   {
-      spec.validate();
-
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("constructor(" + ra + ", " + 
endpointFactory + ", " + spec + ")");
-      }
-
-      if (ra.isUseMaskedPassword())
-      {
-         String pass = spec.getOwnPassword();
-         if (pass != null)
-         {
-            SensitiveDataCodec<String> codec = ra.getCodecInstance();
-
-            try
-            {
-               spec.setPassword(codec.decode(pass));
-            }
-            catch (Exception e)
-            {
-               throw new ResourceException(e);
-            }
-         }
-      }
-
-      this.ra = ra;
-      this.endpointFactory = endpointFactory;
-      this.spec = spec;
-      try
-      {
-         isDeliveryTransacted = 
endpointFactory.isDeliveryTransacted(HornetQActivation.ONMESSAGE);
-      }
-      catch (Exception e)
-      {
-         throw new ResourceException(e);
-      }
-   }
-
-   /**
-    * Get the activation spec
-    *
-    * @return The value
-    */
-   public HornetQActivationSpec getActivationSpec()
-   {
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("getActivationSpec()");
-      }
-
-      return spec;
-   }
-
-   /**
-    * Get the message endpoint factory
-    *
-    * @return The value
-    */
-   public MessageEndpointFactory getMessageEndpointFactory()
-   {
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("getMessageEndpointFactory()");
-      }
-
-      return endpointFactory;
-   }
-
-   /**
-    * Get whether delivery is transacted
-    *
-    * @return The value
-    */
-   public boolean isDeliveryTransacted()
-   {
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("isDeliveryTransacted()");
-      }
-
-      return isDeliveryTransacted;
-   }
-
-   /**
-    * Get the work manager
-    *
-    * @return The value
-    */
-   public WorkManager getWorkManager()
-   {
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("getWorkManager()");
-      }
-
-      return ra.getWorkManager();
-   }
-
-   /**
-    * Is the destination a topic
-    *
-    * @return The value
-    */
-   public boolean isTopic()
-   {
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("isTopic()");
-      }
-
-      return isTopic;
-   }
-
-   /**
-    * Start the activation
-    *
-    * @throws ResourceException Thrown if an error occurs
-    */
-   public void start() throws ResourceException
-   {
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("start()");
-      }
-      deliveryActive.set(true);
-      ra.getWorkManager().scheduleWork(new SetupActivation());
-   }
-
-   /**
-    * @return the topicTemporaryQueue
-    */
-   public SimpleString getTopicTemporaryQueue()
-   {
-      return topicTemporaryQueue;
-   }
-
-   /**
-    * @param topicTemporaryQueue the topicTemporaryQueue to set
-    */
-   public void setTopicTemporaryQueue(SimpleString topicTemporaryQueue)
-   {
-      this.topicTemporaryQueue = topicTemporaryQueue;
-   }
-
-   /**
-    * @return the list of XAResources for this activation endpoint
-    */
-   public List<XAResource> getXAResources()
-   {
-      List<XAResource> xaresources = new ArrayList<XAResource>();
-      for (HornetQMessageHandler handler : handlers)
-      {
-         XAResource xares = handler.getXAResource();
-         if (xares != null)
-         {
-            xaresources.add(xares);
-         }
-      }
-      return xaresources;
-   }
-
-   /**
-    * Stop the activation
-    */
-   public void stop()
-   {
-      if (HornetQActivation.trace)
-      {
-         HornetQRALogger.LOGGER.trace("stop()");
-      }
-
-      deliveryActive.set(false);
-      teardown();
-   }
-
-   /**
-    * Setup the activation
-    *
-    * @throws Exception Thrown if an error occurs
-    */
-   protected synchronized void setup() throws Exception
-   {
-      HornetQRALogger.LOGGER.debug("Setting up " + spec);
-
-      setupCF();
-
-      setupDestination();
-
-      Exception firstException = null;
-
-      for (int i = 0; i < spec.getMaxSession(); i++)
-      {
-         ClientSessionFactory cf = null;
-         ClientSession session = null;
-
-         try
-         {
-            cf = factory.getServerLocator().createSessionFactory();
-            session = setupSession(cf);
-            HornetQMessageHandler handler = new HornetQMessageHandler(this, 
ra.getTM(), (ClientSessionInternal) session, cf, i);
-            handler.setup();
-            handlers.add(handler);
-         }
-         catch (Exception e)
-         {
-            if (cf != null)
-            {
-               cf.close();
-            }
-            if (session != null)
-            {
-               session.close();
-            }
-            if (firstException == null)
-            {
-               firstException = e;
-            }
-         }
-      }
-      //if we have any exceptions close all the handlers and throw the first 
exception.
-      //we don't want partially configured activations, i.e. only 8 out of 15 
sessions started so best to stop and log the error.
-      if (firstException != null)
-      {
-         for (HornetQMessageHandler handler : handlers)
-         {
-            handler.teardown();
-         }
-         throw firstException;
-      }
-
-      //now start them all together.
-      for (HornetQMessageHandler handler : handlers)
-      {
-         handler.start();
-      }
-
-      resourceRecovery = ra.getRecoveryManager().register(factory, 
spec.getUser(), spec.getPassword());
-
-      HornetQRALogger.LOGGER.debug("Setup complete " + this);
-   }
-
-   /**
-    * Teardown the activation
-    */
-   protected synchronized void teardown()
-   {
-      HornetQRALogger.LOGGER.debug("Tearing down " + spec);
-
-      if (resourceRecovery != null)
-      {
-         ra.getRecoveryManager().unRegister(resourceRecovery);
-      }
-
-      final HornetQMessageHandler[] handlersCopy = new 
HornetQMessageHandler[handlers.size()];
-
-      // We need to do from last to first as any temporary queue will have 
been created on the first handler
-      // So we invert the handlers here
-      for (int i = 0; i < handlers.size(); i++)
-      {
-         // The index here is the complimentary so it's inverting the array
-         handlersCopy[i] = handlers.get(handlers.size() - i - 1);
-      }
-
-      handlers.clear();
-
-      FutureLatch future = new FutureLatch(handlersCopy.length);
-      List<Thread> interruptThreads = new ArrayList<>();
-      for (HornetQMessageHandler handler : handlersCopy)
-      {
-         Thread thread = handler.interruptConsumer(future);
-         if (thread != null)
-         {
-            interruptThreads.add(thread);
-         }
-      }
-
-      //wait for all the consumers to complete any onmessage calls
-      boolean stuckThreads = !future.await(factory.getCallTimeout());
-      //if any are stuck then we need to interrupt them
-      if (stuckThreads)
-      {
-         for (Thread interruptThread : interruptThreads)
-         {
-            try
-            {
-               interruptThread.interrupt();
-            }
-            catch (Exception e)
-            {
-               //ok
-            }
-         }
-      }
-
-      Thread threadTearDown = new Thread("TearDown/HornetQActivation")
-      {
-         public void run()
-         {
-            for (HornetQMessageHandler handler : handlersCopy)
-            {
-               handler.teardown();
-            }
-         }
-      };
-
-      // We will first start a new thread that will call tearDown on all the 
instances, trying to graciously shutdown everything.
-      // We will then use the call-timeout to determine a timeout.
-      // if that failed we will then close the connection factory, and 
interrupt the thread
-      threadTearDown.start();
-
-      try
-      {
-         threadTearDown.join(factory.getCallTimeout());
-      }
-      catch (InterruptedException e)
-      {
-         // nothing to be done on this context.. we will just keep going as we 
need to send an interrupt to threadTearDown and give up
-      }
-
-      if (threadTearDown.isAlive())
-      {
-         if (factory != null)
-         {
-            // This will interrupt any threads waiting on reconnect
-            factory.close();
-            factory = null;
-         }
-         threadTearDown.interrupt();
-
-         try
-         {
-            threadTearDown.join(5000);
-         }
-         catch (InterruptedException e)
-         {
-            // nothing to be done here.. we are going down anyways
-         }
-
-         if (threadTearDown.isAlive())
-         {
-            HornetQRALogger.LOGGER.warn("Thread " + threadTearDown + " 
couldn't be finished");
-         }
-      }
-
-      if (spec.isHasBeenUpdated() && factory != null)
-      {
-         factory.close();
-         factory = null;
-      }
-
-
-      HornetQRALogger.LOGGER.debug("Tearing down complete " + this);
-   }
-
-   protected void setupCF() throws Exception
-   {
-      if (spec.getConnectionFactoryLookup() != null)
-      {
-         Context ctx;
-         if (spec.getParsedJndiParams() == null)
-         {
-            ctx = new InitialContext();
-         }
-         else
-         {
-            ctx = new InitialContext(spec.getParsedJndiParams());
-         }
-         Object fac = ctx.lookup(spec.getConnectionFactoryLookup());
-         if (fac instanceof HornetQConnectionFactory)
-         {
-            factory = (HornetQConnectionFactory) fac;
-         }
-         else
-         {
-            HornetQRAConnectionFactory raFact = (HornetQRAConnectionFactory) 
fac;
-            if (spec.isHasBeenUpdated())
-            {
-               factory = 
raFact.getResourceAdapter().createHornetQConnectionFactory(spec);
-            }
-            else
-            {
-               factory = raFact.getDefaultFactory();
-               if (factory != ra.getDefaultHornetQConnectionFactory())
-               {
-                  HornetQRALogger.LOGGER.warnDifferentConnectionfactory();
-               }
-            }
-         }
-      }
-      else if (spec.isHasBeenUpdated())
-      {
-         factory = ra.createHornetQConnectionFactory(spec);
-      }
-      else
-      {
-         factory = ra.getDefaultHornetQConnectionFactory();
-      }
-   }
-
-   /**
-    * Setup a session
-    *
-    * @param cf
-    * @return The connection
-    * @throws Exception Thrown if an error occurs
-    */
-   protected ClientSession setupSession(ClientSessionFactory cf) throws 
Exception
-   {
-      ClientSession result = null;
-
-      try
-      {
-         result = ra.createSession(cf,
-                                   spec.getAcknowledgeModeInt(),
-                                   spec.getUser(),
-                                   spec.getPassword(),
-                                   ra.getPreAcknowledge(),
-                                   ra.getDupsOKBatchSize(),
-                                   ra.getTransactionBatchSize(),
-                                   isDeliveryTransacted,
-                                   spec.isUseLocalTx(),
-                                   spec.getTransactionTimeout());
-
-         result.addMetaData("resource-adapter", "inbound");
-         result.addMetaData("jms-session", "");
-         String clientID = ra.getClientID() == null ? spec.getClientID() : 
ra.getClientID();
-         if (clientID != null)
-         {
-            result.addMetaData("jms-client-id", clientID);
-         }
-
-         HornetQRALogger.LOGGER.debug("Using queue connection " + result);
-
-         return result;
-      }
-      catch (Throwable t)
-      {
-         try
-         {
-            if (result != null)
-            {
-               result.close();
-            }
-         }
-         catch (Exception e)
-         {
-            HornetQRALogger.LOGGER.trace("Ignored error closing connection", 
e);
-         }
-         if (t instanceof Exception)
-         {
-            throw (Exception) t;
-         }
-         throw new RuntimeException("Error configuring connection", t);
-      }
-   }
-
-   public SimpleString getAddress()
-   {
-      return destination.getSimpleAddress();
-   }
-
-   protected void setupDestination() throws Exception
-   {
-
-      String destinationName = spec.getDestination();
-
-      if (spec.isUseJNDI())
-      {
-         Context ctx;
-         if (spec.getParsedJndiParams() == null)
-         {
-            ctx = new InitialContext();
-         }
-         else
-         {
-            ctx = new InitialContext(spec.getParsedJndiParams());
-         }
-         HornetQRALogger.LOGGER.debug("Using context " + ctx.getEnvironment() 
+ " for " + spec);
-         if (HornetQActivation.trace)
-         {
-            HornetQRALogger.LOGGER.trace("setupDestination(" + ctx + ")");
-         }
-
-         String destinationTypeString = spec.getDestinationType();
-         if (destinationTypeString != null && 
!destinationTypeString.trim().equals(""))
-         {
-            HornetQRALogger.LOGGER.debug("Destination type defined as " + 
destinationTypeString);
-
-            Class<?> destinationType;
-            if (Topic.class.getName().equals(destinationTypeString))
-            {
-               destinationType = Topic.class;
-               isTopic = true;
-            }
-            else
-            {
-               destinationType = Queue.class;
-            }
-
-            HornetQRALogger.LOGGER.debug("Retrieving " + 
destinationType.getName() + " \"" + destinationName + "\" from JNDI");
-
-            try
-            {
-               destination = (HornetQDestination) HornetQRaUtils.lookup(ctx, 
destinationName, destinationType);
-            }
-            catch (Exception e)
-            {
-               if (destinationName == null)
-               {
-                  throw HornetQRABundle.BUNDLE.noDestinationName();
-               }
-
-               String calculatedDestinationName = 
destinationName.substring(destinationName.lastIndexOf('/') + 1);
-
-               HornetQRALogger.LOGGER.debug("Unable to retrieve " + 
destinationName +
-                                               " from JNDI. Creating a new " + 
destinationType.getName() +
-                                               " named " + 
calculatedDestinationName + " to be used by the MDB.");
-
-               // If there is no binding on naming, we will just create a new 
instance
-               if (isTopic)
-               {
-                  destination = (HornetQDestination) 
HornetQJMSClient.createTopic(calculatedDestinationName);
-               }
-               else
-               {
-                  destination = (HornetQDestination) 
HornetQJMSClient.createQueue(calculatedDestinationName);
-               }
-            }
-         }
-         else
-         {
-            HornetQRALogger.LOGGER.debug("Destination type not defined in MDB 
activation configuration.");
-            HornetQRALogger.LOGGER.debug("Retrieving " + 
Destination.class.getName() + " \"" + destinationName + "\" from JNDI");
-
-            destination = (HornetQDestination) HornetQRaUtils.lookup(ctx, 
destinationName, Destination.class);
-            if (destination instanceof Topic)
-            {
-               isTopic = true;
-            }
-         }
-      }
-      else
-      {
-         
HornetQRALogger.LOGGER.instantiatingDestination(spec.getDestinationType(), 
spec.getDestination());
-
-         if (Topic.class.getName().equals(spec.getDestinationType()))
-         {
-            destination = (HornetQDestination) 
HornetQJMSClient.createTopic(spec.getDestination());
-            isTopic = true;
-         }
-         else
-         {
-            destination = (HornetQDestination) 
HornetQJMSClient.createQueue(spec.getDestination());
-         }
-      }
-   }
-
-   /**
-    * Get a string representation
-    *
-    * @return The value
-    */
-   @Override
-   public String toString()
-   {
-      StringBuffer buffer = new StringBuffer();
-      buffer.append(HornetQActivation.class.getName()).append('(');
-      buffer.append("spec=").append(spec.getClass().getName());
-      buffer.append(" mepf=").append(endpointFactory.getClass().getName());
-      buffer.append(" active=").append(deliveryActive.get());
-      if (spec.getDestination() != null)
-      {
-         buffer.append(" destination=").append(spec.getDestination());
-      }
-      buffer.append(" transacted=").append(isDeliveryTransacted);
-      buffer.append(')');
-      return buffer.toString();
-   }
-
-   /**
-    * Handles any failure by trying to reconnect
-    *
-    * @param failure the reason for the failure
-    */
-   public void handleFailure(Throwable failure)
-   {
-      if (failure instanceof ActiveMQException && ((ActiveMQException) 
failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST)
-      {
-         
HornetQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
-      }
-      else if (failure instanceof ActiveMQException && ((ActiveMQException) 
failure).getType() == ActiveMQExceptionType.NOT_CONNECTED)
-      {
-         HornetQRALogger.LOGGER.awaitingJMSServerCreation();
-      }
-      else
-      {
-         HornetQRALogger.LOGGER.failureInActivation(failure, spec);
-      }
-      int reconnectCount = 0;
-      int setupAttempts = spec.getSetupAttempts();
-      long setupInterval = spec.getSetupInterval();
-
-      // Only enter the failure loop once
-      if (inFailure.getAndSet(true))
-         return;
-      try
-      {
-         Throwable lastException = failure;
-         while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount 
< setupAttempts))
-         {
-            teardown();
-
-            try
-            {
-               Thread.sleep(setupInterval);
-            }
-            catch (InterruptedException e)
-            {
-               HornetQRALogger.LOGGER.debug("Interrupted trying to reconnect " 
+ spec, e);
-               break;
-            }
-
-            if (reconnectCount < 1)
-            {
-               HornetQRALogger.LOGGER.attemptingReconnect(spec);
-            }
-            try
-            {
-               setup();
-               HornetQRALogger.LOGGER.reconnected();
-               break;
-            }
-            catch (Throwable t)
-            {
-               if (failure instanceof ActiveMQException && 
((ActiveMQException) failure).getType() == 
ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST)
-               {
-                  if (lastException == null || !(t instanceof 
ActiveMQNonExistentQueueException))
-                  {
-                     lastException = t;
-                     
HornetQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
-                  }
-               }
-               else if (failure instanceof ActiveMQException && 
((ActiveMQException) failure).getType() == ActiveMQExceptionType.NOT_CONNECTED)
-               {
-                  if (lastException == null || !(t instanceof 
ActiveMQNotConnectedException))
-                  {
-                     lastException = t;
-                     HornetQRALogger.LOGGER.awaitingJMSServerCreation();
-                  }
-               }
-               else
-               {
-                  HornetQRALogger.LOGGER.errorReconnecting(t, spec);
-               }
-            }
-            ++reconnectCount;
-         }
-      }
-      finally
-      {
-         // Leaving failure recovery loop
-         inFailure.set(false);
-      }
-   }
-
-   public HornetQConnectionFactory getConnectionFactory()
-   {
-      return this.factory;
-   }
-
-   /**
-    * Handles the setup
-    */
-   private class SetupActivation implements Work
-   {
-      public void run()
-      {
-         try
-         {
-            setup();
-         }
-         catch (Throwable t)
-         {
-            handleFailure(t);
-         }
-      }
-
-      public void release()
-      {
-      }
-   }
-}

Reply via email to