User: hiram   
  Date: 01/01/08 13:57:08

  Modified:    src/main/org/jboss/jms/asf StdServerSessionPool.java
                        StdServerSession.java
  Log:
  Transactional support added for MDBs.  Message receipt is now
  part of the transaction for a CMT bean with Required transactions.
  Hopefully this did not break anything else with MDBs.
  
  Revision  Changes    Path
  1.2       +25 -7     jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
  
  Index: StdServerSessionPool.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- StdServerSessionPool.java 2000/12/27 22:45:53     1.1
  +++ StdServerSessionPool.java 2001/01/08 21:57:08     1.2
  @@ -25,8 +25,13 @@
   import javax.jms.ServerSessionPool;
   import javax.jms.MessageListener;
   import javax.jms.TopicConnection;
  +import javax.jms.XATopicConnection;
   import javax.jms.QueueConnection;
  +import javax.jms.XAQueueConnection;
   import javax.jms.Session;
  +import javax.jms.XASession;
  +import javax.jms.XAQueueSession;
  +import javax.jms.XATopicSession;
   
   import org.jboss.logging.Logger;
   /**
  @@ -50,6 +55,11 @@
       private ThreadPool threadPool = new ThreadPool();
       private Vector sessionPool = new Vector();
       
  +     boolean isTransacted() {
  +             return transacted;
  +     }
  +
  +
       /**
        * Minimal constructor, could also have stuff for pool size
        */
  @@ -125,15 +135,23 @@
                try {
                    // Here is the meat, that MUST follow the spec
                    Session ses = null;
  -                 if (con instanceof TopicConnection) {
  -                     ses = ((TopicConnection)con).createTopicSession(transacted, 
ack);
  +                 XASession xaSes = null;
   
  +                 if (con instanceof XATopicConnection) {
  +                             xaSes = 
((XATopicConnection)con).createXATopicSession();
  +                             ses = ((XATopicSession)xaSes).getTopicSession();
  +                 } else if(con instanceof XAQueueConnection) {
  +                             xaSes = 
((XAQueueConnection)con).createXAQueueSession();
  +                             ses = ((XAQueueSession)xaSes).getQueueSession();
  +                 } else if (con instanceof TopicConnection) {
  +                             ses = 
((TopicConnection)con).createTopicSession(transacted, ack);
  +                             Logger.error("WARNING: Using a non-XA TopicConnection. 
 It will not be able to participate in a Global UOW");
                    } else if(con instanceof QueueConnection) {
  -                     ses = ((QueueConnection)con).createQueueSession(transacted, 
ack);
  -                     Logger.debug("Creating a QueueSession" + ses);
  +                             ses = 
((QueueConnection)con).createQueueSession(transacted, ack);
  +                             Logger.error("WARNING: Using a non-XA QueueConnection. 
 It will not be able to participate in a Global UOW");
                    } else {
  -                     Logger.debug("Error in getting session for con" + con);
  -                     throw new JMSException("Connection was not reconizable: " + 
con);
  +                             Logger.debug("Error in getting session for con: " + 
con);
  +                             throw new JMSException("Connection was not 
reconizable: " + con);
                    }
                    
                    // This might not be totala spec compliant since it
  @@ -142,7 +160,7 @@
                    Logger.debug("Setting listener for session");
                    ses.setMessageListener(listener);
                    sessionPool.addElement(
  -                                        new StdServerSession(this, ses)
  +                                        new StdServerSession(this, ses, xaSes)
                                               );   
                }
                   catch (JMSException exception){
  
  
  
  1.2       +98 -5     jboss/src/main/org/jboss/jms/asf/StdServerSession.java
  
  Index: StdServerSession.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- StdServerSession.java     2000/12/27 22:45:53     1.1
  +++ StdServerSession.java     2001/01/08 21:57:08     1.2
  @@ -22,7 +22,14 @@
   import javax.jms.JMSException;
   import javax.jms.ServerSession;
   import javax.jms.Session;
  +import javax.jms.XASession;
  +import javax.naming.InitialContext;
   
  +import javax.transaction.Status;
  +import javax.transaction.Transaction;
  +import javax.transaction.TransactionManager;
  +import javax.transaction.xa.XAResource;
  +
   import org.jboss.logging.Logger;
   /**
    * StdServerSession.java
  @@ -37,11 +44,22 @@
   public class StdServerSession implements Runnable, ServerSession {
       private StdServerSessionPool serverSessionPool = null;
       private Session session = null;
  +    private XASession xaSession = null;
  +     private TransactionManager tm;
  +
       
  -    StdServerSession(StdServerSessionPool pool, Session session) throws 
JMSException{
  +    StdServerSession(StdServerSessionPool pool, Session session, XASession 
xaSession) throws JMSException{
   
        serverSessionPool = pool;
        this.session = session;
  +     this.xaSession = xaSession;
  +
  +     try {
  +             tm = (TransactionManager)new 
InitialContext().lookup("java:/TransactionManager");
  +     } catch ( Exception e ) {
  +             throw new JMSException("Transation Manager was not found");
  +     }
  +
       }
   
       // --- Impl of JMS standard API
  @@ -79,24 +97,99 @@
        * to the session to have been filled with messages and it will run
        * against the listener set in StdServerSessionPool. When it has send
        * all its messages it returns.
  +      *
  +      * HC: run() also starts a transaction with the TransactionManager and
  +      * enlists the XAResource of the JMS XASession if a XASession was abvailable.
  +      * A good JMS implementation should provide the XASession for use in the ASF.
  +      * So we optimize for the case where we have an XASession.  So, for the case
  +      * where we do not have an XASession and the bean is not transacted, we 
  +      * have the unneeded overhead of creating a Transaction.  I'm leaving it
  +      * this way since it keeps the code simpler and that case should not be too 
  +      * common (spyderMQ provides XASessions).
  +      *
        */
       public void run() {
  +
  +     Transaction trans=null;
  +
        try {
  +
            Logger.debug("Invoking run on session");
  +
  +             Logger.debug("Starting the Message Driven Bean transaction");
  +        tm.begin();
  +             trans = tm.getTransaction();
  +
  +             if( xaSession != null ) {
  +
  +             XAResource res = xaSession.getXAResource();
  +             trans.enlistResource(res);
  +             Logger.debug("XAResource '"+res+"' enlisted.");
  +
  +             } 
  +
            session.run();
  +
        }catch (Exception ex) {
  -         // Log error
  -     }finally {
  +     
  +             Logger.exception( ex );
  +
  +             try {
  +                     // The transaction will be rolledback in the finally
  +                     trans.setRollbackOnly();
  +             } catch( Exception e ) {
  +                     Logger.exception( e );
  +             }
  +             
  +     } finally {
  +
  +
  +             try {
  +
  +                     Logger.debug("Ending the Message Driven Bean transaction");
  +
  +             // Marked rollback
  +             if ( trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
  +                                
  +                     // actually roll it back 
  +             trans.rollback();
  +
  +                             // NO XASession? then manually rollback.  This is not 
so good but
  +                             // it's the best we can do if we have no XASession.
  +                             if( xaSession==null && 
serverSessionPool.isTransacted() ) 
  +                                     session.rollback();
  +
  +             } else if(trans.getStatus() == Status.STATUS_ACTIVE) {
  +                                     
  +             // Commit tx
  +             // This will happen if
  +             // a) everything goes well
  +             // b) app. exception was thrown
  +
  +             trans.commit();
  +
  +                             // NO XASession? then manually commit.  This is not so 
good but
  +                             // it's the best we can do if we have no XASession.
  +                             if( xaSession==null && 
serverSessionPool.isTransacted() ) 
  +                                     session.commit();
  +                            
  +             }
  +
  +        } catch(Exception e) {
  +                     // There was a problem doing the commit/rollback.
  +            Logger.exception(e);
  +             }
  +
            StdServerSession.this.recycle();
        }
       }
  -    
  +
       /**
        * This method is called by the ServerSessionPool when it is ready to
        * be recycled intot the pool
        */
       void recycle()
       {
  -     serverSessionPool.recycle(this);
  +             serverSessionPool.recycle(this);
       }
   } // StdServerSession
  
  
  

Reply via email to