User: pkendall
  Date: 01/08/15 21:06:05

  Modified:    src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
  Log:
  Fix mdb undeploy bug.
  
  Revision  Changes    Path
  1.25      +643 -643  
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
  
  Index: JMSContainerInvoker.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
  retrieving revision 1.24
  retrieving revision 1.25
  diff -u -r1.24 -r1.25
  --- JMSContainerInvoker.java  2001/08/09 18:33:25     1.24
  +++ JMSContainerInvoker.java  2001/08/16 04:06:05     1.25
  @@ -54,657 +54,657 @@
   
   /**
    * ContainerInvoker for JMS MessageDrivenBeans, based on JRMPContainerInvoker.
  - *   
  + *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
    * @author <a href="mailto:[EMAIL PROTECTED]";>Rickard Öberg</a>
    * @author <a href="mailto:[EMAIL PROTECTED]";>Sebastien Alborini</a>
    * @author <a href="mailto:[EMAIL PROTECTED]";>Marc Fleury</a>
    * @author <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  - * @version $Revision: 1.24 $
  + * @version $Revision: 1.25 $
    */
   public class JMSContainerInvoker
  -   implements ContainerInvoker, XmlLoadable
  +      implements ContainerInvoker, XmlLoadable
   {
  -   // Constants -----------------------------------------------------
  +      // Constants -----------------------------------------------------
   
  -   /** {@link MessageListener#onMessage} reference. */
  -   protected static /* final */ Method ON_MESSAGE;
  +      /** {@link MessageListener#onMessage} reference. */
  +      protected static /* final */ Method ON_MESSAGE;
   
  -   /**
  -    * Initialize the ON_MESSAGE reference.
  -    */
  -   static {
  -      try {
  -         final Class type = MessageListener.class;
  -         final Class arg = Message.class;
  -         ON_MESSAGE = type.getMethod("onMessage", new Class[] { arg });
  -      }
  -      catch (Exception e) {
  -         e.printStackTrace();
  -         throw new ExceptionInInitializerError(e);
  -      }
  -   }
  -
  -   /** Instance logger. */
  -   private final Category log = Category.getInstance(this.getClass());
  -    
  -   // Attributes ----------------------------------------------------
  -    
  -   protected boolean optimize; // = false;
  -   protected int maxMessagesNr = 1;
  -   protected int maxPoolSize = 15;
  -   protected String providerAdapterJNDI;
  -   protected String serverSessionPoolFactoryJNDI;
  -   protected int acknowledgeMode;
  -   protected Container container;
  -   protected Connection connection;
  -   protected ConnectionConsumer connectionConsumer;
  -   protected TransactionManager tm;
  -   protected ServerSessionPool pool;
  -   protected ExceptionListenerImpl exListener;
  -   protected String beanName;
  -
  -   // Static --------------------------------------------------------
  -
  -   // Constructors --------------------------------------------------
  -
  -   // Public --------------------------------------------------------
  -    
  -   public void setOptimized(final boolean optimize) {
  -      log.debug("Container Invoker optimize set to " + optimize);
  -      this.optimize = optimize;
  -   }
  -
  -   public boolean isOptimized() {
  -      log.debug("Optimize in action: " + optimize);
  -      return optimize;
  -   }
  -
  -   public EJBMetaData getEJBMetaData() {
  -      throw new Error("Not valid for MessageDriven beans");
  -   }
  -
  -   // ContainerInvoker implementation
  -    
  -   public EJBHome getEJBHome() {
  -      throw new Error("Not valid for MessageDriven beans");
  -   }
  -
  -   public EJBObject getStatelessSessionEJBObject() {
  -      throw new Error("Not valid for MessageDriven beans");
  -   }
  -
  -   public EJBObject getStatefulSessionEJBObject(Object id) {
  -      throw new Error("Not valid for MessageDriven beans");
  -   }
  -
  -   public EJBObject getEntityEJBObject(Object id) {
  -      throw new Error("Not valid for MessageDriven beans");
  -   }
  -
  -   public Collection getEntityCollection(Collection ids) {
  -      throw new Error("Not valid for MessageDriven beans");
  -   }
  -
  -   public Object invoke(Object id,
  -                        Method m,
  -                        Object[] args,
  -                        Transaction tx,
  -                        Principal identity,
  -                        Object credential)
  -      throws Exception
  -   {
  -      MethodInvocation mi =
  -         new MethodInvocation(id, m, args, tx, identity, credential);
  -       
  -      // Set the right context classloader
  -      ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
  -      Thread.currentThread().setContextClassLoader(container.getClassLoader());
  -       
  -      try {
  -         return container.invoke(mi);
  -      }
  -      finally {         
  -         Thread.currentThread().setContextClassLoader(oldCl);
  -      }
  -   }
  -
  -   // ContainerService implementation -------------------------------
  -
  -   /**
  -    * Set the container for which this is an invoker to.
  -    *
  -    * @param container    The container for which this is an invoker to.
  -    */
  -   public void setContainer(final Container container)
  -   {
  -      this.container = container;
  -      //jndiName = container.getBeanMetaData().getJndiName();
  -   }
  -
  -   /**
  -    * Return the JMSProviderAdapter that should be used.
  -    *
  -    * @return    The JMSProviderAdapter to use.
  -    */
  -   protected JMSProviderAdapter getJMSProviderAdapter()
  -      throws NamingException
  -   {
  -      Context context = new InitialContext();
  -      try {
  -         log.debug("looking up provider adapter: " + providerAdapterJNDI);
  -         return (JMSProviderAdapter)context.lookup(providerAdapterJNDI);
  -      }
  -      finally {
  -         context.close();
  -      }
  -   }
  -
  -   /**
  -    * Parse the JNDI suffix from the given JNDI name.
  -    *
  -    * @param jndiname        The JNDI name used to lookup the destination.
  -    * @param defaultSuffix   The default suffix to use if parsing fails.
  -    * @return                The parsed suffix or the defaultSuffix
  -    */
  -   protected String parseJndiSuffix(final String jndiname,
  -                                    final String defautSuffix)
  -   {
  -      // jndiSuffix is merely the name that the user has given the MDB.
  -      // since the jndi name contains the message type I have to split 
  -      // at the "/" if there is no slash then I use the entire jndi name...
  -      String jndiSuffix = "";
  -      if (jndiname != null) {
  -         int indexOfSlash = jndiname.indexOf("/");
  -         if (indexOfSlash != -1) {
  -            jndiSuffix = jndiname.substring(indexOfSlash+1);
  -         } else {
  -            jndiSuffix = jndiname;
  -         }
  -      }
  -      else {
  -         // if the jndi name from jboss.xml is null then lets use the ejbName
  -         jndiSuffix = defautSuffix;
  -      }
  -
  -      return jndiSuffix;
  -   }
  -   
  -   /**
  -    * Create and or lookup a JMS destination.
  -    *
  -    * @param type          Either javax.jms.Queue or javax.jms.Topic.
  -    * @param ctx           The naming context to lookup destinations from.
  -    * @param jndiName      The name to use when looking up destinations.
  -    * @param jndiSuffix    The name to use when creating destinations.
  -    * @return              The destination.
  -    *
  -    * @throws IllegalArgumentException    Type is not Queue or Topic.
  -    */
  -   protected Destination createDestination(final Class type,
  -                                           final Context ctx,
  -                                           final String jndiName,
  -                                           final String jndiSuffix)
  -      throws Exception
  -   {
  -      try {
  -         // first try to look it up
  -         return (Destination)ctx.lookup(jndiName);
  -      }
  -      catch (NamingException e) {
  -         // if the lookup failes, the try to create it
  -         log.warn("destination not found: " + jndiName + " reason: " + e);
  -         log.warn("creating a new temporary destination: " + jndiName);
  -         //
  -         // attempt to create the destination (note, this is very
  -         // very, very unportable).
  -         //
  -         MBeanServer server = (MBeanServer)
  -            MBeanServerFactory.findMBeanServer(null).iterator().next();
  -
  -         String methodName;
  -         if (type == Topic.class) {
  -            methodName = "createTopic";
  -         }
  -         else if (type == Queue.class) {
  -            methodName = "createQueue";
  -         }
  -         else {
  -            // type was not a Topic or Queue, bad user
  -            throw new IllegalArgumentException
  -               ("expected javax.jms.Queue or javax.jms.Topic: " + type);
  -         }
  -
  -         // invoke the server to create the destination
  -         server.invoke(new ObjectName("JBossMQ", "service", "Server"),
  -                       methodName,
  -                       new Object[] { jndiSuffix },
  -                       new String[] { "java.lang.String" });
  -
  -         // try to look it up again
  -         return (Destination)ctx.lookup(jndiName);
  -      }
  -   }
  -
  -   /**
  -    * Create a server session pool for the given connection.
  -    *
  -    * @param connection      The connection to use.
  -    * @param maxSession      The maximum number of sessions.
  -    * @param isTransacted    True if the sessions are transacted.
  -    * @param ack             The session acknowledgement mode.
  -    * @param listener        The message listener.
  -    * @return                A server session pool.
  -    *
  -    * @throws JMSException
  -    */
  -   protected ServerSessionPool
  -      createSessionPool(final Connection connection,
  -                        final int maxSession,
  -                        final boolean isTransacted,
  -                        final int ack,
  -                        final MessageListener listener)
  -      throws NamingException, JMSException
  -   {
  -      ServerSessionPool pool;
  -      Context context = new InitialContext();
  -      
  -      try {
  -         // first lookup the factory
  -         log.debug("looking up session pool factory: " +
  -                   serverSessionPoolFactoryJNDI);
  -         ServerSessionPoolFactory factory = (ServerSessionPoolFactory)
  -            context.lookup(serverSessionPoolFactoryJNDI);
  -
  -         // the create the pool
  -         pool = factory.getServerSessionPool
  -            (connection, maxSession, isTransacted, ack, listener);
  -      }
  -      finally {
  -         context.close();
  -      }
  -
  -      return pool;
  -   }
  -
  -   /**
  -    * Initialize the container invoker.  Sets up a connection, a server
  -    * session pool and a connection consumer for the configured destination.
  -    *
  -    * @throws Exception    Failed to initalize.
  -    */
  -   public void init() throws Exception
  -   {
  -      log.debug("initializing");
  -      
  -      // Store TM reference locally - should we test for CMT Required
  -      tm = container.getTransactionManager();
  -
  -      // Get configuration information - from EJB-xml
  -      MessageDrivenMetaData config =
  -         ((MessageDrivenMetaData)container.getBeanMetaData());
  -       
  -      // Selector
  -      String messageSelector = config.getMessageSelector();
  -      
  -      // Queue or Topic
  -      String destinationType = config.getDestinationType();
  -
  -      // Bean Name
  -      beanName = config.getEjbName();
  -      
  -      // Is containermanages TX (not used?)
  -      boolean isContainerManagedTx = config.isContainerManagedTx();
  -      acknowledgeMode = config.getAcknowledgeMode();
  -
  -      // Get configuration data from jboss.xml
  -      String destinationJNDI = config.getDestinationJndiName();
  -      String user = config.getUser();
  -      String password = config.getPasswd();
  -      
  -      // Get the JMS provider
  -      JMSProviderAdapter adapter = getJMSProviderAdapter();
  -      log.debug("provider adapter: " + adapter);
  -
  -      // Connect to the JNDI server and get a reference to root context
  -      Context context = adapter.getInitialContext();
  -      log.debug("context: " + context);
  -      
  -      // if we can't get the root context then exit with an exception
  -      if (context == null) {
  -         throw new RuntimeException("Failed to get the root context");
  -      }
  -
  -      // Get the JNDI suffix of the destination
  -      String jndiSuffix = parseJndiSuffix(destinationJNDI,
  -                                          config.getEjbName());
  -      log.debug("jndiSuffix: " + jndiSuffix);
  -      
  -      if (destinationType.equals("javax.jms.Topic")) 
  -      {
  -         log.debug("Got destination type Topic for " + config.getEjbName());
  -
  -         // create a topic connection
  -         Object factory = context.lookup(adapter.getTopicFactoryRef());
  -         TopicConnection tConnection = 
  -            (TopicConnection)ConnectionFactoryHelper.createTopicConnection
  -            (factory, user, password);
  -         connection = tConnection;
  -         
  -         // lookup or create the destination topic
  -         Topic topic =
  -            (Topic)createDestination(Topic.class,
  -                                     context,
  -                                     "topic/" + jndiSuffix,
  -                                     jndiSuffix);
  -
  -         // set up the server session pool
  -         pool = createSessionPool(tConnection,
  -                                  maxPoolSize,
  -                                  true, // tx
  -                                  acknowledgeMode,
  -                                  new MessageListenerImpl(this));
  -
  -         // To be no-durable or durable
  -         if (config.getSubscriptionDurability() != 
  -             MessageDrivenMetaData.DURABLE_SUBSCRIPTION) 
  -         {
  -            // Create non durable
  -            connectionConsumer = tConnection.
  -               createConnectionConsumer(topic, 
  -                                        messageSelector, 
  -                                        pool, 
  -                                        maxMessagesNr); 
  -         } 
  -         else {
  -            //Durable subscription
  -            String clientId = config.getClientId();
  -            String durableName =
  -               clientId != null ? clientId: config.getEjbName();
  -               
  -            connectionConsumer = tConnection.
  -               createDurableConnectionConsumer(topic, 
  -                                               durableName,
  -                                               messageSelector, 
  -                                               pool, 
  -                                               maxMessagesNr);
  -         }
  -            
  -         log.debug("Topic connectionConsumer set up");
  -      }
  -      else if (destinationType.equals("javax.jms.Queue")) 
  -      {
  -         log.debug("Got destination type Queue for " + config.getEjbName());
  -         
  -         // create a queue connection
  -         Object qFactory = context.lookup(adapter.getQueueFactoryRef());
  -         QueueConnection qConnection =
  -            (QueueConnection)ConnectionFactoryHelper.createQueueConnection
  -            (qFactory, user, password);
  -         connection = qConnection;
  -         
  -         // lookup or create the destination queue
  -         Queue queue = 
  -            (Queue)createDestination(Queue.class,
  -                                     context,
  -                                     "queue/" + jndiSuffix,
  -                                     jndiSuffix);
  -
  -         // set up the server session pool
  -         pool = createSessionPool(qConnection,
  -                                  maxPoolSize,
  -                                  true, // tx
  -                                  acknowledgeMode,
  -                                  new MessageListenerImpl(this));
  -         log.debug("server session pool: " + pool);
  -         
  -         // create the connection consumer
  -         connectionConsumer = qConnection.
  -            createConnectionConsumer(queue, 
  -                                     messageSelector, 
  -                                     pool, 
  -                                     maxMessagesNr); 
  -         log.debug("connection consumer: " + connectionConsumer);
  -      }
  -
  -      log.debug("initialized");
  -   }
  -
  -   /**
  -    * Start the connection.
  -    */
  -   public void start() throws Exception
  -   {
  -      log.debug("Starting JMSContainerInvoker for bean " + beanName);
  -      exListener = new ExceptionListenerImpl(this);
  -      connection.setExceptionListener(exListener);
  -      connection.start();
  -   }
  -    
  -   /**
  -    * Stop the connection.
  -    */
  -   public void stop()
  -   {
  -      log.debug("Stopping JMSContainerInvoker for bean " + beanName);
  -      // Silence the exception listener
  -      if (exListener != null) {
  -         exListener.stop();
  -      }
  -      innerStop();
  -   }
  -
  -   /**
  -    * Stop done from inside, we should not stop the 
  -    * exceptionListener in inner stop.
  -    */
  -   protected void innerStop() {
  -      try {
  -         if (connection != null) {
  -            connection.setExceptionListener(null);
  -            log.debug("unset exception listener");
  -         }
  -      } catch (Exception e) {
  -         log.error("Could not set ExceptionListener to null", e);
  -      }      
  -     
  -      // Stop the connection
  -      try {
  -         if (connection != null) {
  -            connection.stop();
  -            log.debug("connection stopped");
  -         }
  -      } catch (Exception e) {
  -         log.error("Could not stop JMS connection", e);
  -      }
  -   }
  -    
  -   /**
  -    * Take down all fixtures.
  -    */
  -   public void destroy()
  -   {
  -      log.debug("Destroying JMSContainerInvoker for bean " + beanName);
  -
  -      // clear the server session pool (if it is clearable)
  -      try {
  -         if (pool instanceof StdServerSessionPool) {
  -            StdServerSessionPool p = (StdServerSessionPool)pool;
  -            p.clear();
  -         }
  -      } catch (Exception e) {
  -         log.error("Could not clear ServerSessionPool", e);
  -      }
  -
  -      // close the connection consumer
  -      try {
  -         if (connectionConsumer != null) {
  -            connectionConsumer.close();
  -         }
  -      } catch (Exception e) {
  -         log.error("Could not close consumer", e);
  -      }
  -
  -      // close the connection
  -      if (connection != null) {      
  -         try {
  -            connection.close();
  -         } catch (Exception e) {
  -            log.error("Could not close connection", e);
  -         }
  -      }
  -   }
  -    
  -   /**
  -    * XmlLoadable implementation
  -    */
  -   public void importXml(Element element) throws DeploymentException 
  -   {
  -      try {
  -         String maxMessages = MetaData.getElementContent
  -            (MetaData.getUniqueChild(element, "MaxMessages"));
  -         maxMessagesNr = Integer.parseInt(maxMessages);
  -         
  -         String maxSize = MetaData.getElementContent
  -            (MetaData.getUniqueChild(element, "MaximumSize"));
  -         maxPoolSize = Integer.parseInt(maxSize);
  -      } catch (NumberFormatException e) {
  -         //Noop will take default value
  -      } catch (DeploymentException e) {
  -         //Noop will take default value
  -      }
  -     
  -      // If these are not found we will get a DeploymentException, I hope
  -      providerAdapterJNDI = MetaData.getElementContent
  -         (MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
  -        
  -      serverSessionPoolFactoryJNDI = MetaData.getElementContent
  -         (MetaData.getUniqueChild(element, "ServerSessionPoolFactoryJNDI"));
  -     
  -      // Check java:/ prefix
  -      if (!providerAdapterJNDI.startsWith("java:/"))
  -         providerAdapterJNDI = "java:/"+providerAdapterJNDI;
  -        
  -      if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
  -         serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
  -   }
  -    
  -   // Package protected ---------------------------------------------
  -    
  -   // Protected -----------------------------------------------------
  -    
  -   // Private -------------------------------------------------------
  -    
  -   // Inner classes -------------------------------------------------
  -
  -   /**
  -    * An implementation of MessageListener that passes messages on
  -    * to the container invoker.
  -    */
  -   class MessageListenerImpl
  -      implements MessageListener
  -   {
  -      /** The container invoker. */
  -      JMSContainerInvoker invoker; // = null;
  -
  -      /**
  -       * Construct a <tt>MessageListenerImpl</tt>.
  -       *
  -       * @param invoker   The container invoker.  Must not be null.
  -       */
  -      MessageListenerImpl(final JMSContainerInvoker invoker) {
  -         // assert invoker != null;
  -         
  -         this.invoker = invoker;
  -      }
  -
  -      /**
  -       * Process a message.
  -       *
  -       * @param message    The message to process.
  -       */
  -      public void onMessage(final Message message)
  -      {
  -         // assert message != null;
  -         
  -         if (log.isDebugEnabled()) {
  -            log.debug("processing message: " + message);
  -         }
  -            
  -         Object id;
  -         try {
  -            id = message.getJMSMessageID();
  -         } catch (JMSException e) {
  -            // what ?
  -            id = "JMSContainerInvoker";
  -         }
  -            
  -         // Invoke, shuld we catch any Exceptions??
  -         try {
  -            invoker.invoke(id,                     // Object id - where used?
  -                           ON_MESSAGE,             // Method to invoke
  -                           new Object[] {message}, // argument
  -                           tm.getTransaction(),    // Transaction 
  -                           null,                   // Principal
  -                           null);                  // Cred
  -         }
  -         catch (Exception e) {
  -            log.error("Exception in JMSCI message listener", e);
  -         }
  -      }
  -   }
  -
  -   /**
  -    * ExceptionListener for failover handling.
  -    */
  -   class ExceptionListenerImpl
  -      implements ExceptionListener
  -   {
  -      JMSContainerInvoker invoker; // = null;
  -      Thread currentThread; // = null;
  -      boolean notStoped; // = true;
  -     
  -      ExceptionListenerImpl(final JMSContainerInvoker invoker) {
  -         this.invoker = invoker;
  -      }
  -
  -      void stop() {
  -         log.debug("stop requested");
  -            
  -         notStoped = false;
  -         if (currentThread != null) {
  -            currentThread.interrupt();
  -            log.debug("current thread interrupted");
  -         }
  -      }
  -
  -      public void onException(JMSException ex) {
  -         currentThread = Thread.currentThread();
  -         
  -         log.warn("MDB lost connection to provider", ex);
  -         boolean tryIt = true;
  -         while(tryIt && notStoped) {
  -            log.info("MDB Trying to reconnect...");
  -            try {
  -               try {
  -                  Thread.sleep(10000);
  -               } catch (InterruptedException ie) {
  -                  tryIt=false; return;
  -               }
  -                    
  -               // Reboot container
  -               invoker.innerStop();
  -               invoker.destroy();
  -               invoker.init();
  -               invoker.start();
  -               tryIt = false;
  -               log.info("OK - reconnected");
  -            }
  -            catch (Exception e) {
  -               log.error("MDB error reconnecting", e);
  -            }
  -         }
  -         currentThread = null;
  -      }
  -   }
  +      /**
  +             * Initialize the ON_MESSAGE reference.
  +             */
  +      static {
  +                     try {
  +                              final Class type = MessageListener.class;
  +                              final Class arg = Message.class;
  +                              ON_MESSAGE = type.getMethod("onMessage", new Class[] 
{ arg });
  +                     }
  +                     catch (Exception e) {
  +                              e.printStackTrace();
  +                              throw new ExceptionInInitializerError(e);
  +                     }
  +      }
  +
  +      /** Instance logger. */
  +      private final Category log = Category.getInstance(this.getClass());
  +
  +      // Attributes ----------------------------------------------------
  +
  +      protected boolean optimize; // = false;
  +      protected int maxMessagesNr = 1;
  +      protected int maxPoolSize = 15;
  +      protected String providerAdapterJNDI;
  +      protected String serverSessionPoolFactoryJNDI;
  +      protected int acknowledgeMode;
  +      protected Container container;
  +      protected Connection connection;
  +      protected ConnectionConsumer connectionConsumer;
  +      protected TransactionManager tm;
  +      protected ServerSessionPool pool;
  +      protected ExceptionListenerImpl exListener;
  +      protected String beanName;
  +
  +      // Static --------------------------------------------------------
  +
  +      // Constructors --------------------------------------------------
  +
  +      // Public --------------------------------------------------------
  +
  +      public void setOptimized(final boolean optimize) {
  +                     log.debug("Container Invoker optimize set to " + optimize);
  +                     this.optimize = optimize;
  +      }
  +
  +      public boolean isOptimized() {
  +                     log.debug("Optimize in action: " + optimize);
  +                     return optimize;
  +      }
  +
  +      public EJBMetaData getEJBMetaData() {
  +                     throw new Error("Not valid for MessageDriven beans");
  +      }
  +
  +      // ContainerInvoker implementation
  +
  +      public EJBHome getEJBHome() {
  +                     throw new Error("Not valid for MessageDriven beans");
  +      }
  +
  +      public EJBObject getStatelessSessionEJBObject() {
  +                     throw new Error("Not valid for MessageDriven beans");
  +      }
  +
  +      public EJBObject getStatefulSessionEJBObject(Object id) {
  +                     throw new Error("Not valid for MessageDriven beans");
  +      }
  +
  +      public EJBObject getEntityEJBObject(Object id) {
  +                     throw new Error("Not valid for MessageDriven beans");
  +      }
  +
  +      public Collection getEntityCollection(Collection ids) {
  +                     throw new Error("Not valid for MessageDriven beans");
  +      }
  +
  +      public Object invoke(Object id,
  +                                                                                    
         Method m,
  +                                                                                    
         Object[] args,
  +                                                                                    
         Transaction tx,
  +                                                                                    
         Principal identity,
  +                                                                                    
         Object credential)
  +                     throws Exception
  +      {
  +                     MethodInvocation mi =
  +                              new MethodInvocation(id, m, args, tx, identity, 
credential);
  +
  +                     // Set the right context classloader
  +                     ClassLoader oldCl = 
Thread.currentThread().getContextClassLoader();
  +                     
Thread.currentThread().setContextClassLoader(container.getClassLoader());
  +
  +                     try {
  +                              return container.invoke(mi);
  +                     }
  +                     finally {
  +                              Thread.currentThread().setContextClassLoader(oldCl);
  +                     }
  +      }
  +
  +      // ContainerService implementation -------------------------------
  +
  +      /**
  +             * Set the container for which this is an invoker to.
  +             *
  +             * @param container    The container for which this is an invoker to.
  +             */
  +      public void setContainer(final Container container)
  +      {
  +                     this.container = container;
  +                     //jndiName = container.getBeanMetaData().getJndiName();
  +      }
  +
  +      /**
  +             * Return the JMSProviderAdapter that should be used.
  +             *
  +             * @return    The JMSProviderAdapter to use.
  +             */
  +      protected JMSProviderAdapter getJMSProviderAdapter()
  +                     throws NamingException
  +      {
  +                     Context context = new InitialContext();
  +                     try {
  +                              log.debug("looking up provider adapter: " + 
providerAdapterJNDI);
  +                              return 
(JMSProviderAdapter)context.lookup(providerAdapterJNDI);
  +                     }
  +                     finally {
  +                              context.close();
  +                     }
  +      }
  +
  +      /**
  +             * Parse the JNDI suffix from the given JNDI name.
  +             *
  +             * @param jndiname        The JNDI name used to lookup the destination.
  +             * @param defaultSuffix   The default suffix to use if parsing fails.
  +             * @return                The parsed suffix or the defaultSuffix
  +             */
  +      protected String parseJndiSuffix(final String jndiname,
  +                                                                                    
                                                         final String defautSuffix)
  +      {
  +                     // jndiSuffix is merely the name that the user has given the 
MDB.
  +                     // since the jndi name contains the message type I have to 
split
  +                     // at the "/" if there is no slash then I use the entire jndi 
name...
  +                     String jndiSuffix = "";
  +                     if (jndiname != null) {
  +                              int indexOfSlash = jndiname.indexOf("/");
  +                              if (indexOfSlash != -1) {
  +                                             jndiSuffix = 
jndiname.substring(indexOfSlash+1);
  +                              } else {
  +                                             jndiSuffix = jndiname;
  +                              }
  +                     }
  +                     else {
  +                              // if the jndi name from jboss.xml is null then lets 
use the ejbName
  +                              jndiSuffix = defautSuffix;
  +                     }
  +
  +                     return jndiSuffix;
  +      }
  +
  +      /**
  +             * Create and or lookup a JMS destination.
  +             *
  +             * @param type          Either javax.jms.Queue or javax.jms.Topic.
  +             * @param ctx           The naming context to lookup destinations from.
  +             * @param jndiName      The name to use when looking up destinations.
  +             * @param jndiSuffix    The name to use when creating destinations.
  +             * @return              The destination.
  +             *
  +             * @throws IllegalArgumentException    Type is not Queue or Topic.
  +             */
  +      protected Destination createDestination(final Class type,
  +                                                                                    
                                                                                  
final Context ctx,
  +                                                                                    
                                                                                  
final String jndiName,
  +                                                                                    
                                                                                  
final String jndiSuffix)
  +                     throws Exception
  +      {
  +                     try {
  +                              // first try to look it up
  +                              return (Destination)ctx.lookup(jndiName);
  +                     }
  +                     catch (NamingException e) {
  +                              // if the lookup failes, the try to create it
  +                              log.warn("destination not found: " + jndiName + " 
reason: " + e);
  +                              log.warn("creating a new temporary destination: " + 
jndiName);
  +                              //
  +                              // attempt to create the destination (note, this is 
very
  +                              // very, very unportable).
  +                              //
  +                              MBeanServer server = (MBeanServer)
  +                                             
MBeanServerFactory.findMBeanServer(null).iterator().next();
  +
  +                              String methodName;
  +                              if (type == Topic.class) {
  +                                             methodName = "createTopic";
  +                              }
  +                              else if (type == Queue.class) {
  +                                             methodName = "createQueue";
  +                              }
  +                              else {
  +                                             // type was not a Topic or Queue, bad 
user
  +                                             throw new IllegalArgumentException
  +                                                      ("expected javax.jms.Queue or 
javax.jms.Topic: " + type);
  +                              }
  +
  +                              // invoke the server to create the destination
  +                              server.invoke(new ObjectName("JBossMQ", "service", 
"Server"),
  +                                                                                    
  methodName,
  +                                                                                    
  new Object[] { jndiSuffix },
  +                                                                                    
  new String[] { "java.lang.String" });
  +
  +                              // try to look it up again
  +                              return (Destination)ctx.lookup(jndiName);
  +                     }
  +      }
  +
  +      /**
  +             * Create a server session pool for the given connection.
  +             *
  +             * @param connection      The connection to use.
  +             * @param maxSession      The maximum number of sessions.
  +             * @param isTransacted    True if the sessions are transacted.
  +             * @param ack             The session acknowledgement mode.
  +             * @param listener        The message listener.
  +             * @return                A server session pool.
  +             *
  +             * @throws JMSException
  +             */
  +      protected ServerSessionPool
  +                     createSessionPool(final Connection connection,
  +                                                                                    
         final int maxSession,
  +                                                                                    
         final boolean isTransacted,
  +                                                                                    
         final int ack,
  +                                                                                    
         final MessageListener listener)
  +                     throws NamingException, JMSException
  +      {
  +                     ServerSessionPool pool;
  +                     Context context = new InitialContext();
  +
  +                     try {
  +                              // first lookup the factory
  +                              log.debug("looking up session pool factory: " +
  +                                                                      
serverSessionPoolFactoryJNDI);
  +                              ServerSessionPoolFactory factory = 
(ServerSessionPoolFactory)
  +                                             
context.lookup(serverSessionPoolFactoryJNDI);
  +
  +                              // the create the pool
  +                              pool = factory.getServerSessionPool
  +                                             (connection, maxSession, isTransacted, 
ack, listener);
  +                     }
  +                     finally {
  +                              context.close();
  +                     }
  +
  +                     return pool;
  +      }
  +
  +      /**
  +             * Initialize the container invoker.  Sets up a connection, a server
  +             * session pool and a connection consumer for the configured 
destination.
  +             *
  +             * @throws Exception    Failed to initalize.
  +             */
  +      public void init() throws Exception
  +      {
  +                     log.debug("initializing");
  +
  +                     // Store TM reference locally - should we test for CMT Required
  +                     tm = container.getTransactionManager();
  +
  +                     // Get configuration information - from EJB-xml
  +                     MessageDrivenMetaData config =
  +                              ((MessageDrivenMetaData)container.getBeanMetaData());
  +
  +                     // Selector
  +                     String messageSelector = config.getMessageSelector();
  +
  +                     // Queue or Topic
  +                     String destinationType = config.getDestinationType();
  +
  +                     // Bean Name
  +                     beanName = config.getEjbName();
  +
  +                     // Is containermanages TX (not used?)
  +                     boolean isContainerManagedTx = config.isContainerManagedTx();
  +                     acknowledgeMode = config.getAcknowledgeMode();
  +
  +                     // Get configuration data from jboss.xml
  +                     String destinationJNDI = config.getDestinationJndiName();
  +                     String user = config.getUser();
  +                     String password = config.getPasswd();
  +
  +                     // Get the JMS provider
  +                     JMSProviderAdapter adapter = getJMSProviderAdapter();
  +                     log.debug("provider adapter: " + adapter);
  +
  +                     // Connect to the JNDI server and get a reference to root 
context
  +                     Context context = adapter.getInitialContext();
  +                     log.debug("context: " + context);
  +
  +                     // if we can't get the root context then exit with an exception
  +                     if (context == null) {
  +                              throw new RuntimeException("Failed to get the root 
context");
  +                     }
  +
  +                     // Get the JNDI suffix of the destination
  +                     String jndiSuffix = parseJndiSuffix(destinationJNDI,
  +                                                                                    
                                                                                 
config.getEjbName());
  +                     log.debug("jndiSuffix: " + jndiSuffix);
  +
  +                     if (destinationType.equals("javax.jms.Topic"))
  +                     {
  +                              log.debug("Got destination type Topic for " + 
config.getEjbName());
  +
  +                              // create a topic connection
  +                              Object factory = 
context.lookup(adapter.getTopicFactoryRef());
  +                              TopicConnection tConnection =
  +                                             
(TopicConnection)ConnectionFactoryHelper.createTopicConnection
  +                                             (factory, user, password);
  +                              connection = tConnection;
  +
  +                              // lookup or create the destination topic
  +                              Topic topic =
  +                                             (Topic)createDestination(Topic.class,
  +                                                                                    
                                                          context,
  +                                                                                    
                                                          "topic/" + jndiSuffix,
  +                                                                                    
                                                          jndiSuffix);
  +
  +                              // set up the server session pool
  +                              pool = createSessionPool(tConnection,
  +                                                                                    
                                                 maxPoolSize,
  +                                                                                    
                                                 true, // tx
  +                                                                                    
                                                 acknowledgeMode,
  +                                                                                    
                                                 new MessageListenerImpl(this));
  +
  +                              // To be no-durable or durable
  +                              if (config.getSubscriptionDurability() !=
  +                                              
MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
  +                              {
  +                                             // Create non durable
  +                                             connectionConsumer = tConnection.
  +                                                      
createConnectionConsumer(topic,
  +                                                                                    
                                                                         
messageSelector,
  +                                                                                    
                                                                         pool,
  +                                                                                    
                                                                         
maxMessagesNr);
  +                              }
  +                              else {
  +                                             //Durable subscription
  +                                             String clientId = config.getClientId();
  +                                             String durableName =
  +                                                      clientId != null ? clientId: 
config.getEjbName();
  +
  +                                             connectionConsumer = tConnection.
  +                                                      
createDurableConnectionConsumer(topic,
  +                                                                                    
                                                                                       
           durableName,
  +                                                                                    
                                                                                       
           messageSelector,
  +                                                                                    
                                                                                       
           pool,
  +                                                                                    
                                                                                       
           maxMessagesNr);
  +                              }
  +
  +                              log.debug("Topic connectionConsumer set up");
  +                     }
  +                     else if (destinationType.equals("javax.jms.Queue"))
  +                     {
  +                              log.debug("Got destination type Queue for " + 
config.getEjbName());
  +
  +                              // create a queue connection
  +                              Object qFactory = 
context.lookup(adapter.getQueueFactoryRef());
  +                              QueueConnection qConnection =
  +                                             
(QueueConnection)ConnectionFactoryHelper.createQueueConnection
  +                                             (qFactory, user, password);
  +                              connection = qConnection;
  +
  +                              // lookup or create the destination queue
  +                              Queue queue =
  +                                             (Queue)createDestination(Queue.class,
  +                                                                                    
                                                          context,
  +                                                                                    
                                                          "queue/" + jndiSuffix,
  +                                                                                    
                                                          jndiSuffix);
  +
  +                              // set up the server session pool
  +                              pool = createSessionPool(qConnection,
  +                                                                                    
                                                 maxPoolSize,
  +                                                                                    
                                                 true, // tx
  +                                                                                    
                                                 acknowledgeMode,
  +                                                                                    
                                                 new MessageListenerImpl(this));
  +                              log.debug("server session pool: " + pool);
  +
  +                              // create the connection consumer
  +                              connectionConsumer = qConnection.
  +                                             createConnectionConsumer(queue,
  +                                                                                    
                                                          messageSelector,
  +                                                                                    
                                                          pool,
  +                                                                                    
                                                          maxMessagesNr);
  +                              log.debug("connection consumer: " + 
connectionConsumer);
  +                     }
  +
  +                     log.debug("initialized");
  +      }
  +
  +      /**
  +             * Start the connection.
  +             */
  +      public void start() throws Exception
  +      {
  +                     log.debug("Starting JMSContainerInvoker for bean " + beanName);
  +                     exListener = new ExceptionListenerImpl(this);
  +                     connection.setExceptionListener(exListener);
  +                     connection.start();
  +      }
  +
  +      /**
  +             * Stop the connection.
  +             */
  +      public void stop()
  +      {
  +                     log.debug("Stopping JMSContainerInvoker for bean " + beanName);
  +                     // Silence the exception listener
  +                     if (exListener != null) {
  +                              exListener.stop();
  +                     }
  +                     innerStop();
  +      }
  +
  +      /**
  +             * Stop done from inside, we should not stop the
  +             * exceptionListener in inner stop.
  +             */
  +      protected void innerStop() {
  +                     try {
  +                              if (connection != null) {
  +                                             connection.setExceptionListener(null);
  +                                             log.debug("unset exception listener");
  +                              }
  +                     } catch (Exception e) {
  +                              log.error("Could not set ExceptionListener to null", 
e);
  +                     }
  +
  +                     // Stop the connection
  +                     try {
  +                              if (connection != null) {
  +                                             connection.stop();
  +                                             log.debug("connection stopped");
  +                              }
  +                     } catch (Exception e) {
  +                              log.error("Could not stop JMS connection", e);
  +                     }
  +      }
  +
  +      /**
  +             * Take down all fixtures.
  +             */
  +      public void destroy()
  +      {
  +                     log.debug("Destroying JMSContainerInvoker for bean " + 
beanName);
  +
  +                     // close the connection consumer
  +                     try {
  +                              if (connectionConsumer != null) {
  +                                             connectionConsumer.close();
  +                              }
  +                     } catch (Exception e) {
  +                              log.error("Could not close consumer", e);
  +                     }
  +
  +                     // clear the server session pool (if it is clearable)
  +                     try {
  +                              if (pool instanceof StdServerSessionPool) {
  +                                             StdServerSessionPool p = 
(StdServerSessionPool)pool;
  +                                             p.clear();
  +                              }
  +                     } catch (Exception e) {
  +                              log.error("Could not clear ServerSessionPool", e);
  +                     }
  +
  +                     // close the connection
  +                     if (connection != null) {
  +                              try {
  +                                             connection.close();
  +                              } catch (Exception e) {
  +                                             log.error("Could not close 
connection", e);
  +                              }
  +                     }
  +      }
  +
  +      /**
  +             * XmlLoadable implementation
  +             */
  +      public void importXml(Element element) throws DeploymentException
  +      {
  +                     try {
  +                              String maxMessages = MetaData.getElementContent
  +                                             (MetaData.getUniqueChild(element, 
"MaxMessages"));
  +                              maxMessagesNr = Integer.parseInt(maxMessages);
  +
  +                              String maxSize = MetaData.getElementContent
  +                                             (MetaData.getUniqueChild(element, 
"MaximumSize"));
  +                              maxPoolSize = Integer.parseInt(maxSize);
  +                     } catch (NumberFormatException e) {
  +                              //Noop will take default value
  +                     } catch (DeploymentException e) {
  +                              //Noop will take default value
  +                     }
  +
  +                     // If these are not found we will get a DeploymentException, I 
hope
  +                     providerAdapterJNDI = MetaData.getElementContent
  +                              (MetaData.getUniqueChild(element, 
"JMSProviderAdapterJNDI"));
  +
  +                     serverSessionPoolFactoryJNDI = MetaData.getElementContent
  +                              (MetaData.getUniqueChild(element, 
"ServerSessionPoolFactoryJNDI"));
  +
  +                     // Check java:/ prefix
  +                     if (!providerAdapterJNDI.startsWith("java:/"))
  +                              providerAdapterJNDI = "java:/"+providerAdapterJNDI;
  +
  +                     if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
  +                              serverSessionPoolFactoryJNDI = 
"java:/"+serverSessionPoolFactoryJNDI;
  +      }
  +
  +      // Package protected ---------------------------------------------
  +
  +      // Protected -----------------------------------------------------
  +
  +      // Private -------------------------------------------------------
  +
  +      // Inner classes -------------------------------------------------
  +
  +      /**
  +             * An implementation of MessageListener that passes messages on
  +             * to the container invoker.
  +             */
  +      class MessageListenerImpl
  +                     implements MessageListener
  +      {
  +                     /** The container invoker. */
  +                     JMSContainerInvoker invoker; // = null;
  +
  +                     /**
  +                      * Construct a <tt>MessageListenerImpl</tt>.
  +                      *
  +                      * @param invoker   The container invoker.  Must not be null.
  +                      */
  +                     MessageListenerImpl(final JMSContainerInvoker invoker) {
  +                              // assert invoker != null;
  +
  +                              this.invoker = invoker;
  +                     }
  +
  +                     /**
  +                      * Process a message.
  +                      *
  +                      * @param message    The message to process.
  +                      */
  +                     public void onMessage(final Message message)
  +                     {
  +                              // assert message != null;
  +
  +                              if (log.isDebugEnabled()) {
  +                                             log.debug("processing message: " + 
message);
  +                              }
  +
  +                              Object id;
  +                              try {
  +                                             id = message.getJMSMessageID();
  +                              } catch (JMSException e) {
  +                                             // what ?
  +                                             id = "JMSContainerInvoker";
  +                              }
  +
  +                              // Invoke, shuld we catch any Exceptions??
  +                              try {
  +                                             invoker.invoke(id,                     
// Object id - where used?
  +                                                                                    
                  ON_MESSAGE,             // Method to invoke
  +                                                                                    
                  new Object[] {message}, // argument
  +                                                                                    
                  tm.getTransaction(),    // Transaction
  +                                                                                    
                  null,                   // Principal
  +                                                                                    
                  null);                  // Cred
  +                              }
  +                              catch (Exception e) {
  +                                             log.error("Exception in JMSCI message 
listener", e);
  +                              }
  +                     }
  +      }
  +
  +      /**
  +             * ExceptionListener for failover handling.
  +             */
  +      class ExceptionListenerImpl
  +                     implements ExceptionListener
  +      {
  +                     JMSContainerInvoker invoker; // = null;
  +                     Thread currentThread; // = null;
  +                     boolean notStoped; // = true;
  +
  +                     ExceptionListenerImpl(final JMSContainerInvoker invoker) {
  +                              this.invoker = invoker;
  +                     }
  +
  +                     void stop() {
  +                              log.debug("stop requested");
  +
  +                              notStoped = false;
  +                              if (currentThread != null) {
  +                                             currentThread.interrupt();
  +                                             log.debug("current thread 
interrupted");
  +                              }
  +                     }
  +
  +                     public void onException(JMSException ex) {
  +                              currentThread = Thread.currentThread();
  +
  +                              log.warn("MDB lost connection to provider", ex);
  +                              boolean tryIt = true;
  +                              while(tryIt && notStoped) {
  +                                             log.info("MDB Trying to reconnect...");
  +                                             try {
  +                                                      try {
  +                                                                     
Thread.sleep(10000);
  +                                                      } catch (InterruptedException 
ie) {
  +                                                                     tryIt=false; 
return;
  +                                                      }
  +
  +                                                      // Reboot container
  +                                                      invoker.innerStop();
  +                                                      invoker.destroy();
  +                                                      invoker.init();
  +                                                      invoker.start();
  +                                                      tryIt = false;
  +                                                      log.info("OK - reconnected");
  +                                             }
  +                                             catch (Exception e) {
  +                                                      log.error("MDB error 
reconnecting", e);
  +                                             }
  +                              }
  +                              currentThread = null;
  +                     }
  +      }
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to