User: user57  
  Date: 01/07/20 19:08:02

  Modified:    src/main/org/jboss/ejb/plugins/jms JMSContainerInvoker.java
  Log:
   o Using ConnectionFactoryHelper to create connections
   o Factored out some of the common Queue & Topic initalization so they can
     both use the same code.  Note there is still some work that could be done
     here to clean things up.
   o Documented things a bit more.
  
  Revision  Changes    Path
  1.18      +253 -177  
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
  
  Index: JMSContainerInvoker.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- JMSContainerInvoker.java  2001/07/09 21:14:28     1.17
  +++ JMSContainerInvoker.java  2001/07/21 02:08:01     1.18
  @@ -35,6 +35,7 @@
   
   import org.apache.log4j.Category;
   
  +import org.jboss.jms.ConnectionFactoryHelper;
   import org.jboss.ejb.MethodInvocation;
   import org.jboss.ejb.Container;
   import org.jboss.ejb.ContainerInvokerContainer;
  @@ -59,7 +60,7 @@
    * @author <a href="mailto:[EMAIL PROTECTED]";>Sebastien Alborini</a>
    * @author <a href="mailto:[EMAIL PROTECTED]";>Marc Fleury</a>
    * @author <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  - * @version $Revision: 1.17 $
  + * @version $Revision: 1.18 $
    */
   public class JMSContainerInvoker
      implements ContainerInvoker, XmlLoadable
  @@ -108,20 +109,17 @@
   
      // Public --------------------------------------------------------
       
  -   public void setOptimized(boolean optimize)
  -   {
  +   public void setOptimized(final boolean optimize) {
         log.debug("Container Invoker optimize set to " + optimize);
         this.optimize = optimize;
      }
   
  -   public boolean isOptimized()
  -   {
  +   public boolean isOptimized() {
         log.debug("Optimize in action: " + optimize);
         return optimize;
      }
   
  -   public EJBMetaData getEJBMetaData()
  -   {
  +   public EJBMetaData getEJBMetaData() {
         throw new Error("Not valid for MessageDriven beans");
      }
   
  @@ -171,7 +169,12 @@
      }
   
      // ContainerService implementation -------------------------------
  -    
  +
  +   /**
  +    * Set the container for which this is an invoker to.
  +    *
  +    * @param container    The container for which this is an invoker to.
  +    */
      public void setContainer(final Container container)
      {
         this.container = container;
  @@ -180,6 +183,8 @@
   
      /**
       * Return the JMSProviderAdapter that should be used.
  +    *
  +    * @return    The JMSProviderAdapter to use.
       */
      protected JMSProviderAdapter getJMSProviderAdapter()
         throws NamingException
  @@ -195,23 +200,160 @@
      }
   
      /**
  -    * Return the ServerSessionPoolFactory that should be used.
  +    * Create a new connection from the given factory.
  +    *
  +    * @param factory     An object that implements QueueConnectionFactory,
  +    *                    XAQueueConnectionFactory, TopicConnectionFactory or
  +    *                    XATopicConnectionFactory.
  +    * @param username    The username to use or null for no user.
  +    * @param password    The password for the given username or null if no
  +    * @return            A new connection from the given factory.
  +    *
  +    * @throws JMSException    Failed to create connection.
       */
  -   protected ServerSessionPoolFactory getServerSessionPoolFactory()
  -      throws NamingException
  +   protected Connection createConnection(final Object factory,
  +                                         final String username,
  +                                         final String password)
  +      throws JMSException
  +   {
  +      log.debug("attempting to create connection from factory: " + factory);
  +      connection = ConnectionFactoryHelper.createConnection
  +         (factory, username, password);
  +      log.debug("created connection: " + connection);
  +      
  +      return connection;
  +   }
  +
  +   /**
  +    * Parse the JNDI suffix from the given JNDI name.
  +    *
  +    * @param jndiname        The JNDI name used to lookup the destination.
  +    * @param defaultSuffix   The default suffix to use if parsing fails.
  +    * @return                The parsed suffix or the defaultSuffix
  +    */
  +   protected String parseJndiSuffix(final String jndiname,
  +                                    final String defautSuffix)
  +   {
  +      // jndiSuffix is merely the name that the user has given the MDB.
  +      // since the jndi name contains the message type I have to split 
  +      // at the "/" if there is no slash then I use the entire jndi name...
  +      String jndiSuffix = "";
  +      if (jndiname != null) {
  +         int indexOfSlash = jndiname.indexOf("/");
  +         if (indexOfSlash != -1) {
  +            jndiSuffix = jndiname.substring(indexOfSlash+1);
  +         } else {
  +            jndiSuffix = jndiname;
  +         }
  +      }
  +      else {
  +         // if the jndi name from jboss.xml is null then lets use the ejbName
  +         jndiSuffix = defautSuffix;
  +      }
  +
  +      return jndiSuffix;
  +   }
  +   
  +   /**
  +    * Create and or lookup a JMS destination.
  +    *
  +    * @param type          Either javax.jms.Queue or javax.jms.Topic.
  +    * @param ctx           The naming context to lookup destinations from.
  +    * @param jndiName      The name to use when looking up destinations.
  +    * @param jndiSuffix    The name to use when creating destinations.
  +    * @return              The destination.
  +    *
  +    * @throws IllegalArgumentException    Type is not Queue or Topic.
  +    */
  +   protected Destination createDestination(final Class type,
  +                                           final Context ctx,
  +                                           final String jndiName,
  +                                           final String jndiSuffix)
  +      throws Exception
      {
  +      try {
  +         // first try to look it up
  +         return (Destination)ctx.lookup(jndiName);
  +      }
  +      catch (NamingException e) {
  +         // if the lookup failes, the try to create it
  +         log.warn("destination not found: " + jndiName, e);
  +
  +         //
  +         // attempt to create the destination (note, this is very
  +         // very, very unportable).
  +         //
  +         MBeanServer server = (MBeanServer)
  +            MBeanServerFactory.findMBeanServer(null).iterator().next();
  +
  +         if (type == Topic.class) {
  +            server.invoke(new ObjectName("JMS","service","JMSServer"),
  +                          "newTopic", new Object[]{ jndiSuffix },
  +                          new String[] {"java.lang.String"});
  +         }
  +         else if (type == Queue.class) {
  +            server.invoke(new ObjectName("JMS","service","JMSServer"),
  +                          "newQueue", new Object[]{ jndiSuffix },
  +                          new String[] {"java.lang.String"});
  +         }
  +         else {
  +            // type was not a Topic or Queue, bad user
  +            throw new IllegalArgumentException
  +               ("expected javax.jms.Queue or javax.jms.Topic for type: " +
  +                type);
  +         }
  +         
  +         return (Destination)ctx.lookup(jndiName);
  +      }
  +   }
  +
  +   /**
  +    * Create a server session pool for the given connection.
  +    *
  +    * @param connection      The connection to use.
  +    * @param maxSession      The maximum number of sessions.
  +    * @param isTransacted    True if the sessions are transacted.
  +    * @param ack             The session acknowledgement mode.
  +    * @param listener        The message listener.
  +    * @return                A server session pool.
  +    *
  +    * @throws JMSException
  +    */
  +   protected ServerSessionPool
  +      createSessionPool(final Connection connection,
  +                        final int maxSession,
  +                        final boolean isTransacted,
  +                        final int ack,
  +                        final MessageListener listener)
  +      throws NamingException, JMSException
  +   {
  +      ServerSessionPool pool;
         Context context = new InitialContext();
  +      
         try {
  +         // first lookup the factory
            log.debug("looking up session pool factory: " +
                      serverSessionPoolFactoryJNDI);
  -         return (ServerSessionPoolFactory)
  +         ServerSessionPoolFactory factory = (ServerSessionPoolFactory)
               context.lookup(serverSessionPoolFactoryJNDI);
  +
  +         // the create the pool
  +         pool = factory.getServerSessionPool
  +            (connection, maxSession, isTransacted, ack, listener);
         }
         finally {
            context.close();
         }
  +
  +      return pool;
      }
   
  +   /**
  +    * Initialize the container invoker.  Sets up a connection, a server
  +    * session pool and a connection consumer for the configured destination.
  +    *
  +    * @throws Exception    Failed to initalize.
  +    */
      public void init() throws Exception
      {
         log.debug("initializing");
  @@ -229,121 +371,74 @@
         // Queue or Topic
         String destinationType = config.getDestinationType();
         
  -      // Is containermanages TX
  +      // Is containermanages TX (not used?)
         boolean isContainerManagedTx = config.isContainerManagedTx();
  -
         acknowledgeMode = config.getAcknowledgeMode();
   
         // Get configuration data from jboss.xml
         String destinationJNDI = config.getDestinationJndiName();
         String user = config.getUser();
  -
  -      // Set upp JNDI
  -      // connect to the JNDI server and get a reference to root context
  -      Context context = null;
  -
  +      String password = config.getPasswd();
  +      
  +      // Get the JMS provider
         JMSProviderAdapter adapter = getJMSProviderAdapter();
         log.debug("provider adapter: " + adapter);
  -      
  -      context = adapter.getInitialContext();
  +
  +      // Connect to the JNDI server and get a reference to root context
  +      Context context = adapter.getInitialContext();
         log.debug("context: " + context);
         
         // if we can't get the root context then exit with an exception
  -      if (context == null)
  -      {
  +      if (context == null) {
            throw new RuntimeException("Failed to get the root context");
         }
  -       
  -      // Set up pool
  -      ServerSessionPoolFactory poolFactory = getServerSessionPoolFactory();
   
  -      // jndiSuffix is merely the name that the user has given the MDB.
  -      // since the jndi name contains the message type I have to split 
  -      // at the "/" if there is no slash then I use the entire jndi name...
  -      String jndiSuffix = "";
  -      if (destinationJNDI != null) {
  -         int indexOfSlash = destinationJNDI.indexOf("/");
  -         if (indexOfSlash != -1) {
  -            jndiSuffix = destinationJNDI.substring(indexOfSlash+1);
  -         } else {
  -            jndiSuffix = destinationJNDI;
  -         }
  -         // if the jndi name from jboss.xml is null then lets use the ejbName
  -      } else {
  -         jndiSuffix = config.getEjbName();
  -      }
  +      // Get the JNDI suffix of the destination
  +      String jndiSuffix = parseJndiSuffix(destinationJNDI,
  +                                          config.getEjbName());
         log.debug("jndiSuffix: " + jndiSuffix);
         
  -      MBeanServer server = (MBeanServer)
  -         MBeanServerFactory.findMBeanServer(null).iterator().next();
  -       
         if (destinationType.equals("javax.jms.Topic")) 
         {
            log.debug("Got destination type Topic for " + config.getEjbName());
  -            
  -         // All classes are different between topics and queues!!
  -         TopicConnectionFactory topicFactory = 
  -            (TopicConnectionFactory)context.
  -            lookup(adapter.getTopicFactoryRef());
  -            
  -         // Do we have a user - this is messy code (should be done for queues to)
  -         TopicConnection topicConnection;
  -         if(user != null) 
  -         {
  -            log.debug("Creating topic connection with user: " + 
  -                      user + " passwd: " + config.getPasswd());
  -            topicConnection = topicFactory.
  -               createTopicConnection(user, config.getPasswd());
  -         }
  -         else 
  -         {
  -            topicConnection = topicFactory.createTopicConnection();
  -         }
  -            
  -         // Lookup destination
  -         // First Try a lookup.
  -         // If that lookup fails then try to contact the MBeanServer and inoke a 
new...
  -         // Then do lookup again..
  -         String topicJndi = "topic/"+jndiSuffix;
  -         Topic topic;
  -         try {
  -            topic = (Topic)context.lookup(topicJndi);
  -         } catch(NamingException ne) {
  -            log.error("JndiName not found:"+topicJndi +
  -                      "...attempting to recover", ne);
  -            server.invoke(new ObjectName("JMS","service","JMSServer"),
  -                          "newTopic", new Object[]{jndiSuffix},
  -                          new String[] {"java.lang.String"});
  -            topic = (Topic)context.lookup(topicJndi);
  -         }
  -            
  -         pool = poolFactory.getServerSessionPool
  -            (topicConnection,
  -             maxPoolSize,
  -             //Transacted
  -             true, 
  -             acknowledgeMode, 
  -             new MessageListenerImpl(this));
  +
  +         // create a topic connection
  +         Object factory = context.lookup(adapter.getTopicFactoryRef());
  +         TopicConnection tConnection = 
  +            (TopicConnection)createConnection(factory, user, password);
  +
  +         // lookup or create the destination topic
  +         Topic topic =
  +            (Topic)createDestination(Topic.class,
  +                                     context,
  +                                     "topic/" + jndiSuffix,
  +                                     jndiSuffix);
  +
  +         // set up the server session pool
  +         pool = createSessionPool(tConnection,
  +                                  maxPoolSize,
  +                                  true, // tx
  +                                  acknowledgeMode,
  +                                  new MessageListenerImpl(this));
   
            // To be no-durable or durable
            if (config.getSubscriptionDurability() != 
                MessageDrivenMetaData.DURABLE_SUBSCRIPTION) 
            {
               // Create non durable
  -            connectionConsumer = topicConnection.
  +            connectionConsumer = tConnection.
                  createConnectionConsumer(topic, 
                                           messageSelector, 
                                           pool, 
                                           maxMessagesNr); 
            } 
  -         else 
  -         {
  +         else {
               //Durable subscription
               String clientId = config.getClientId();
  -            String durableName = clientId != null ? clientId:
  -                    
  -               config.getEjbName();
  -            connectionConsumer = topicConnection.
  +            String durableName =
  +               clientId != null ? clientId: config.getEjbName();
  +               
  +            connectionConsumer = tConnection.
                  createDurableConnectionConsumer(topic, 
                                                  durableName,
                                                  messageSelector, 
  @@ -351,71 +446,45 @@
                                                  maxMessagesNr);
            }
               
  -         // set global connection, so we have something to
  -         // start() and close()
  -         connection = topicConnection;
            log.debug("Topic connectionConsumer set up");
  -
         }
         else if (destinationType.equals("javax.jms.Queue")) 
         {
  -         log.debug("Got destination type Queue");
  -         QueueConnectionFactory queueFactory = 
  -            (QueueConnectionFactory)context.lookup(adapter.getQueueFactoryRef());
  -            
  -         // Do we have a user
  -         QueueConnection queueConnection;
  -         if (user != null) 
  -         {
  -            queueConnection = queueFactory.
  -               createQueueConnection(user, config.getPasswd());
  -         } 
  -         else 
  -         {
  -            queueConnection = queueFactory.createQueueConnection();
  -         }
  -
  -         // Lookup destination
  -         // First Try a lookup.
  -         // If that lookup fails then try to contact the MBeanServer and inoke a 
new...
  -         // Then do lookup again..
  -         String queueJndi = "queue/"+jndiSuffix;
  -         Queue queue;
  -         try {
  -            queue = (Queue)context.lookup(queueJndi);
  -         } catch(NamingException ne) {
  -            log.error("JndiName not found:"+queueJndi +
  -                      "...attempting to recover", ne);
  -            server.invoke(new ObjectName("JMS:service=JMSServer"),
  -                          "newQueue", new Object[]{jndiSuffix},
  -                          new String[] {"java.lang.String"});
  -            queue = (Queue)context.lookup(queueJndi);
  -         }
  -            
  -         pool = poolFactory.
  -            getServerSessionPool(queueConnection,
  -                                 maxPoolSize, 
  -                                 //Transacted
  -                                 true, 
  -                                 acknowledgeMode, 
  -                                 new MessageListenerImpl(this));
  +         log.debug("Got destination type Queue for " + config.getEjbName());
  +         
  +         // create a queue connection
  +         Object qFactory = context.lookup(adapter.getQueueFactoryRef());
  +         QueueConnection qConnection =
  +            (QueueConnection)createConnection(qFactory, user, password);
  +
  +         // lookup or create the destination queue
  +         Queue queue = 
  +            (Queue)createDestination(Queue.class,
  +                                     context,
  +                                     "queue/" + jndiSuffix,
  +                                     jndiSuffix);
  +
  +         // set up the server session pool
  +         pool = createSessionPool(qConnection,
  +                                  maxPoolSize,
  +                                  true, // tx
  +                                  acknowledgeMode,
  +                                  new MessageListenerImpl(this));
   
  -         connectionConsumer = queueConnection.
  +         connectionConsumer = qConnection.
               createConnectionConsumer(queue, 
                                        messageSelector, 
                                        pool, 
                                        maxMessagesNr); 
   
  -         // set global connection, so we have something to
  -         // start() and close()
  -         connection = queueConnection;
            log.debug("Queue connectionConsumer set up");
         }
      }
   
  -   // Start the connection
  -   public void start()
  -      throws Exception
  +   /**
  +    * Start the connection.
  +    */
  +   public void start() throws Exception
      {
         log.debug("Starting JMSContainerInvoker");
         exListener = new ExceptionListenerImpl(this);
  @@ -423,7 +492,9 @@
         connection.start();
      }
       
  -   // Stop the connection
  +   /**
  +    * Stop the connection.
  +    */
      public void stop()
      {
         log.debug("Stopping JMSContainerInvoker");
  @@ -432,7 +503,6 @@
            exListener.stop();
         }
         innerStop();
  -     
      }
   
      /**
  @@ -443,6 +513,7 @@
         try {
            if (connection != null) {
               connection.setExceptionListener(null);
  +            log.debug("unset exception listener");
            }
         } catch (Exception e) {
            log.error("Could not set ExceptionListener to null", e);
  @@ -452,17 +523,21 @@
         try {
            if (connection != null) {
               connection.stop();
  +            log.debug("connection stopped");
            }
         } catch (Exception e) {
            log.error("Could not stop JMS connection", e);
         }
      }
  -    
       
  -   // Take down all fixtures
  +   /**
  +    * Take down all fixtures.
  +    */
      public void destroy()
      {
         log.debug("Destroying JMSContainerInvoker");
  +
  +      // clear the server session pool (if it is clearable)
         try {
            if (pool instanceof StdServerSessionPool) {
               StdServerSessionPool p = (StdServerSessionPool)pool;
  @@ -471,26 +546,29 @@
         } catch (Exception e) {
            log.error("Could not clear ServerSessionPool", e);
         }
  -             
  +
  +      // close the connection consumer
         try {
            if (connectionConsumer != null) {
               connectionConsumer.close();
            }
  -      } catch(Exception e) {
  +      } catch (Exception e) {
            log.error("Could not close consumer", e);
         }
  -        
  -      try {
  -         if (connection != null) {
  +
  +      // close the connection
  +      if (connection != null) {      
  +         try {
               connection.close();
  +         } catch (Exception e) {
  +            log.error("Could not close connection", e);
            }
  -      } catch(Exception e) {
  -         log.error("Could not close connection", e);
         }
      }
  -    
  -   // XmlLoadable implementation
       
  +   /**
  +    * XmlLoadable implementation
  +    */
      public void importXml(Element element) throws DeploymentException 
      {
         try {
  @@ -501,9 +579,9 @@
            String maxSize = MetaData.getElementContent
               (MetaData.getUniqueChild(element, "MaximumSize"));
            maxPoolSize = Integer.parseInt(maxSize);
  -      } catch(NumberFormatException e) {
  +      } catch (NumberFormatException e) {
            //Noop will take default value
  -      } catch(DeploymentException e) {
  +      } catch (DeploymentException e) {
            //Noop will take default value
         }
        
  @@ -522,7 +600,6 @@
            serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
      }
       
  -    
      // Package protected ---------------------------------------------
       
      // Protected -----------------------------------------------------
  @@ -530,7 +607,11 @@
      // Private -------------------------------------------------------
       
      // Inner classes -------------------------------------------------
  -    
  +
  +   /**
  +    * An implementation of MessageListener that passes messages on
  +    * to the container invoker.
  +    */
      class MessageListenerImpl
         implements MessageListener
      {
  @@ -555,19 +636,14 @@
               
            // Invoke, shuld we catch any Exceptions??
            try {
  -            invoker.invoke(// Object id - where used?
  -                           id,
  -                           // Method to invoke
  -                           ON_MESSAGE,
  -                           //argument
  -                           new Object[] {message},
  -                           //Transaction 
  -                           tm.getTransaction(),
  -                           //Principal
  -                           null,
  -                           //Cred
  -                           null);
  -         } catch(Exception e) {
  +            invoker.invoke(id,                     // Object id - where used?
  +                           ON_MESSAGE,             // Method to invoke
  +                           new Object[] {message}, // argument
  +                           tm.getTransaction(),    // Transaction 
  +                           null,                   // Principal
  +                           null);                  // Cred
  +         }
  +         catch (Exception e) {
               log.error("Exception in JMSCI message listener", e);
            }
         }
  @@ -619,7 +695,7 @@
                  tryIt = false;
                  log.info("OK - reconnected");
               }
  -            catch(Exception e) {
  +            catch (Exception e) {
                  log.error("MDB error reconnecting", e);
               }
            }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to