User: hiram   
  Date: 00/12/18 22:43:35

  Modified:    src/java/org/spydermq SpyTopicSession.java SpySession.java
                        SpyQueueSession.java SpyQueueReceiver.java
                        SpyMessageConsumer.java SpyConnection.java
  Added:       src/java/org/spydermq SpyXAQueueConnection.java
                        SpyXAQueueConnectionFactory.java SpyXAResource.java
                        SpyXAResourceManager.java SpyXATopicConnection.java
                        SpyXATopicConnectionFactory.java
                        TransactionRequest.java
  Removed:     src/java/org/spydermq Transaction.java
  Log:
  Add XA support!  Well.. I haven't tested very much but it's a start.
  
  Revision  Changes    Path
  1.10      +17 -4     spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SpyTopicSession.java      2000/12/12 05:58:57     1.9
  +++ SpyTopicSession.java      2000/12/19 06:43:33     1.10
  @@ -13,6 +13,7 @@
   import javax.jms.JMSException;
   import javax.jms.TopicPublisher;
   import javax.jms.TemporaryTopic;
  +import javax.jms.XATopicSession;
   
   import java.util.Collection;
   import java.util.HashSet;
  @@ -25,24 +26,30 @@
   import org.spydermq.selectors.Selector; 
   import org.spydermq.Log;
   
  +
   /**
  - *   This class implements javax.jms.TopicSession
  + *   This class implements javax.jms.TopicSession and javax.jms.XATopicSession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class SpyTopicSession 
        extends SpySession 
  -     implements TopicSession
  +     implements TopicSession, javax.jms.XATopicSession
   {
        
        // Constructor ---------------------------------------------------
           
  +     SpyTopicSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop, boolean xaSession)
  +     {
  +             super(myConnection,transacted,acknowledgeMode,stop,xaSession);
  +     }
  +   
        SpyTopicSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
        {
  -             super(myConnection,transacted,acknowledgeMode,stop);
  +             this(myConnection,transacted,acknowledgeMode,stop,false);
        }
   
        // Public --------------------------------------------------------
  @@ -110,4 +117,10 @@
                //Not yet implemented
        }
                
  +     /**
  +      * getTopicSession method comment.
  +      */
  +     public javax.jms.TopicSession getTopicSession() throws javax.jms.JMSException {
  +             return this;
  +     }
   }
  
  
  
  1.15      +90 -104   spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- SpySession.java   2000/12/13 15:59:10     1.14
  +++ SpySession.java   2000/12/19 06:43:33     1.15
  @@ -16,6 +16,8 @@
   import javax.jms.MessageListener;
   import javax.jms.StreamMessage;
   import javax.jms.TextMessage;
  +import javax.jms.XASession;
  +import javax.transaction.xa.XAResource;
   
   import java.io.Serializable;
   import java.io.File;
  @@ -25,16 +27,17 @@
   import java.util.Iterator;
   import java.util.Collection;
    
  +
   /**
  - *   This class implements javax.jms.Session
  + *   This class implements javax.jms.Session and javax.jms.XASession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.14 $
  + *   @version $Revision: 1.15 $
    */
   public class SpySession 
  -     implements Runnable, Session
  +     implements Runnable, Session, XASession
   {    
        // Attributes ----------------------------------------------------
        
  @@ -46,8 +49,7 @@
        private MessageListener messageListener;
        //The connection object to which this session is linked
        protected SpyConnection connection;
  -     //The outgoing message queue 
  -     protected LinkedList outgoingQueue;
  +
        //Is my connection in stopped mode ?
        protected boolean modeStop;
        //Is the session closed ?
  @@ -57,16 +59,23 @@
        //MessageConsumers created by this session
        protected HashSet consumers;
        
  +     //The transctionId of the current transaction (registed with the 
SpyXAResourceManager)
  +     Object currentTransactionId;
  +     // If this is an XASession, we have an associated XAResource
  +     SpyXAResource spyXAResource;
  +
        // Constructor ---------------------------------------------------             
    
   
  -     SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
  +     SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop, 
boolean xaSession)
        { 
   
                connection=conn;
  -             transacted=trans;
  +             transacted=trans;                       
                acknowledgeMode=acknowledge;
  -             outgoingQueue=new LinkedList();
                modeStop=stop;
  +             if( xaSession )
  +                     spyXAResource = new SpyXAResource(this);
  +
                messageListener=null;
                closed=false;
                mutex=new Mutex();
  @@ -79,9 +88,14 @@
                
                //Wait for the thread to sleep
                mutex.waitLocked();
  +
  +             //Have a TX ready with the resource manager.
  +             if( spyXAResource==null && transacted )
  +                     currentTransactionId = 
connection.spyXAResourceManager.startTx();
                        
        }
   
  +
        // Public --------------------------------------------------------
        
        public BytesMessage createBytesMessage() throws JMSException
  @@ -246,9 +260,11 @@
        //Commit a transacted session
        public synchronized void commit() throws JMSException
        {
  +             if (spyXAResource!=null) throw new 
javax.jms.TransactionInProgressException("Should not be call from a XASession");
                if (closed) throw new IllegalStateException("The session is closed");  
         
                if (!transacted) throw new IllegalStateException("The session is not 
transacted");
   
  +                     
                Log.log("Session: commit()");
   
                boolean modeSav=modeStop;
  @@ -257,23 +273,21 @@
                //Wait for the thread to sleep
                synchronized (mutex) {                  
                        mutex.waitToSleep();
  -                     Transaction transaction = new Transaction();
  -                     
  -                     // Send to server all published messages
  -                     if (outgoingQueue.size()!=0) {
  -                             SpyMessage job[]=new SpyMessage[outgoingQueue.size()];
  -                             job=(SpyMessage[])outgoingQueue.toArray(job);
  -                             transaction.messages = job;
  -                     }
  -                     
  -                     //Clear the outgoing queue
  -                     outgoingQueue.clear();
  -                     
  -                     //Acknowlege all consumed messages
  -                     transaction.acks = removeAcknowledgementItems();
  -                     
  -                     connection.send(transaction);
  -                     
  +
  +                     // commit transaction with onePhase commit
  +                     try {
  +                             
connection.spyXAResourceManager.commit(currentTransactionId, true);
  +                     } catch ( javax.transaction.xa.XAException e ) {
  +                             connection.failureHandler(e,"Could not commit");
  +                     } finally {
  +                             try {
  +                                     
connection.spyXAResourceManager.endTx(currentTransactionId, true);
  +                             } catch ( Exception ignore ) {}
  +                             try {
  +                                     currentTransactionId = 
connection.spyXAResourceManager.startTx();
  +                             } catch ( Exception ignore ) {}
  +                     }                                               
  +                                                                             
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
                        mutex.notifyLock();                     
  @@ -284,6 +298,7 @@
        //Rollback a transacted session
        public synchronized void rollback() throws JMSException
        {
  +             if (spyXAResource!=null) throw new 
javax.jms.TransactionInProgressException("Should not be call from a XASession");
                if (closed) throw new IllegalStateException("The session is closed");  
         
                if (!transacted) throw new IllegalStateException("The session is not 
transacted");
                                                                         
  @@ -296,19 +311,20 @@
                synchronized (mutex) {
                        
                        mutex.waitToSleep();
  -
  -                     Transaction transaction = new Transaction();
   
  -                     //Clear the outgoing queue
  -                     outgoingQueue.clear();
  -
  -                     //Neg Acknowlege all consumed messages
  -                     transaction.acks = removeAcknowledgementItems();
  -                     for( int i=0; i < transaction.acks.length; i++ ) {
  -                             transaction.acks[i].isAck = false;
  -                     }
  -                     
  -                     connection.send(transaction);
  +                     // rollback transaction
  +                     try {
  +                             
connection.spyXAResourceManager.rollback(currentTransactionId);
  +                     } catch ( javax.transaction.xa.XAException e ) {
  +                             connection.failureHandler(e,"Could not commit");
  +                     } finally {
  +                             try {
  +                                     
connection.spyXAResourceManager.endTx(currentTransactionId, true);
  +                             } catch ( Exception ignore ) {}
  +                             try {
  +                                     currentTransactionId = 
connection.spyXAResourceManager.startTx();
  +                             } catch ( Exception ignore ) {}
  +                     }                                               
                                                
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
  @@ -323,30 +339,9 @@
                                                                         
                Log.log("Session: recover()");
   
  -             boolean modeSav=modeStop;
  -             modeStop=true;
  +             rollback();
                
  -             //Wait for the thread to sleep
  -             synchronized (mutex) {
  -                     
  -                     mutex.waitToSleep();
  -
  -                     Transaction transaction = new Transaction();
  -
  -                     //Neg Acknowlege all consumed messages
  -                     transaction.acks = removeAcknowledgementItems();
  -                     for( int i=0; i < transaction.acks.length; i++ ) {
  -                             transaction.acks[i].isAck = false;
  -                     }
  -                     
  -                     connection.send(transaction);
  -                                             
  -                     //We have finished our work, we can wake up the thread
  -                     modeStop=modeSav;
  -                     mutex.notifyLock();
  -             }
                
  -             
        }
   
        public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
  @@ -379,18 +374,8 @@
                if (modeStop==newValue) return; 
                
                modeStop=newValue;
  -             
  -             if (modeStop) {
  -                     
  -                     //Wait for the thread to sleep
  -                     mutex.waitToSleep();
  -                                             
  -             } else {
  -                     
  -                     //Wake up the thread
  -                     mutex.notifyLock();
  -                     
  -             }
  +
  +             notifyStopMode();
                
        }
                
  @@ -423,49 +408,50 @@
        }
        
   
  -     //Get all the messages that have been consumed but need acks
  -     private SpyAcknowledgementItem[] removeAcknowledgementItems() 
  -     {
  -             //Count the messages
  -             int i=0;
  -             Iterator iter=consumers.iterator();
  -             while (iter.hasNext()) {
  -                     SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
  -                     i += mc.messagesConsumed.size();
  -             }       
  -             
  -             //Now fill  the array
  -             SpyAcknowledgementItem items[] = new SpyAcknowledgementItem[i];
  -             i=0;
  -             iter=consumers.iterator();
  -             while (iter.hasNext()) {
  -                     SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
  -                     Iterator mesIter = mc.messagesConsumed.iterator();
  -                     while(mesIter.hasNext()) {
  -                             String messageId = (String)mesIter.next();
  -                             SpyAcknowledgementItem item = new 
SpyAcknowledgementItem();
  -                             item.jmsDestination = mc.destination;
  -                             item.jmsMessageID = messageId;
  -                             item.isAck = true;
  -                             items[i++] = item;
  -                     }
  -                     mc.messagesConsumed.clear();
  -             }
  -             
  -             return items;
  -     }
  +
        
        //called by a MessageProducer object which needs to publish a message
        void sendMessage(SpyMessage m) throws JMSException {
                if (closed)
                        throw new IllegalStateException("The session is closed");
  -
  +             
                if( transacted ) {
  -                     outgoingQueue.add(m);
  +                     
connection.spyXAResourceManager.addMessage(currentTransactionId, m);
                } else {
                        connection.sendToServer(m);
                }
                
        }
  +
  +     
  +
  +     /**
  +      * getXAResource method comment.
  +      */
  +     public javax.transaction.xa.XAResource getXAResource() {
  +             return null;
  +     }
  +
  +     // The XA transaction state may have changed... either it started or ended
  +     // We have to wait until message delivery has stopped or wake up the thread
  +     void notifyStopMode()
  +     {
                
  +             // Should it be stopped because we are outside the XA transaction 
(start/end)
  +             boolean xaStop = spyXAResource!=null && currentTransactionId==null;
  +             
  +             if ( modeStop || xaStop ) {
  +                     
  +                     //Wait for the thread to sleep
  +                     mutex.waitToSleep();
  +                                             
  +             } else {
  +                     
  +                     //Wake up the thread
  +                     mutex.notifyLock();
  +                     
  +             }
  +             
  +     }
  +     
   }
  
  
  
  1.8       +19 -5     spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyQueueSession.java      2000/12/12 05:58:58     1.7
  +++ SpyQueueSession.java      2000/12/19 06:43:33     1.8
  @@ -15,6 +15,7 @@
   import javax.jms.TemporaryQueue;
   import javax.jms.QueueBrowser;
   import javax.jms.DeliveryMode;
  +import javax.jms.XAQueueSession;
   
   import java.util.HashSet;
   import java.util.HashMap;
  @@ -22,25 +23,30 @@
   
   
   /**
  - *   This class implements javax.jms.QueueSession
  + *   This class implements javax.jms.QueueSession and javax.jms.XAQueueSession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyQueueSession 
        extends SpySession 
  -     implements QueueSession
  +     implements QueueSession, XAQueueSession
   {
        
        // Constructor ---------------------------------------------------
           
  -     SpyQueueSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
  +     SpyQueueSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop, boolean xaSession)
        {
  -             super(myConnection,transacted,acknowledgeMode,stop);
  +             super(myConnection,transacted,acknowledgeMode,stop, xaSession);
        }
   
  +     SpyQueueSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
  +     {
  +             this(myConnection,transacted,acknowledgeMode,stop, false);
  +     }   
  +
        // Public --------------------------------------------------------
   
        public QueueBrowser createBrowser(Queue queue) throws JMSException
  @@ -97,4 +103,12 @@
                
                return ((SpyQueueConnection)connection).getTemporaryQueue();
        }
  +
  +     /**
  +      * getQueueSession method comment.
  +      */
  +     public QueueSession getQueueSession() throws javax.jms.JMSException {
  +             return this;
  +     }
  +
   }
  
  
  
  1.6       +3 -2      spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyQueueReceiver.java     2000/11/19 19:59:57     1.5
  +++ SpyQueueReceiver.java     2000/12/19 06:43:34     1.6
  @@ -16,8 +16,9 @@
    *   This class implements javax.jms.QueueReceiver
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class SpyQueueReceiver 
        extends SpyMessageConsumer 
  @@ -208,7 +209,7 @@
                        messageListener.onMessage(mes);
   
                        if( session.transacted ) {
  -                             messagesConsumed.addLast(mes.getJMSMessageID());
  +                             
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
                        } else if( session.acknowledgeMode==session.AUTO_ACKNOWLEDGE 
|| session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
                                mes.doAcknowledge();
                        }       
  
  
  
  1.6       +6 -8      spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyMessageConsumer.java   2000/12/13 15:59:09     1.5
  +++ SpyMessageConsumer.java   2000/12/19 06:43:34     1.6
  @@ -12,8 +12,10 @@
   import javax.jms.Message;
   import javax.jms.Session;
   import javax.jms.Destination;
  +
   import java.util.LinkedList;
   import java.util.Iterator;
  +
   import org.spydermq.selectors.Selector;
   
   /**
  @@ -22,7 +24,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class SpyMessageConsumer 
        implements MessageConsumer
  @@ -45,8 +47,7 @@
        //Is the consumer sleeping in a receive() ?
        boolean waitInReceive;
        public Destination destination; 
  -     //If the session is transacted: contains JMSMessageId's of messages consumed
  -     LinkedList messagesConsumed;    
  +     
        //Am I in noLocal mode ?
        boolean noLocal;
        
  @@ -61,9 +62,6 @@
                messageSelector=null;
                messages=new LinkedList();
                waitInReceive=false;
  -             if( session.transacted ) {
  -                     messagesConsumed = new LinkedList();
  -             }
        }
        
                
  @@ -170,7 +168,7 @@
                                        message.setSpySession(session);
   
                                        if( session.transacted ) {
  -                                             
messagesConsumed.addLast(message.getJMSMessageID());
  +                                             
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, 
message);
                                        } else if( 
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
                                                message.doAcknowledge();
                                        }
  @@ -216,7 +214,7 @@
                                        
                                messageListener.onMessage(mes);
                                if( session.transacted ) {
  -                                     
messagesConsumed.addLast(mes.getJMSMessageID());
  +                                     
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
                                } else if( 
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE || 
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
                                        mes.doAcknowledge();
                                }       
  
  
  
  1.17      +15 -14    spyderMQ/src/java/org/spydermq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- SpyConnection.java        2000/12/12 05:58:56     1.16
  +++ SpyConnection.java        2000/12/19 06:43:34     1.17
  @@ -32,7 +32,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.16 $
  + *   @version $Revision: 1.17 $
    */
   public class SpyConnection 
        implements Connection, Serializable
  @@ -60,6 +60,8 @@
        String crClassName;     
        //the exceptionListener
        private ExceptionListener exceptionListener;
  +     // Used to control tranactions
  +     SpyXAResourceManager spyXAResourceManager;
   
        // Constructor ---------------------------------------------------         
        SpyConnection(DistributedJMSServer theServer,String cID,String crCN) throws 
JMSException
  @@ -74,6 +76,7 @@
                modeStop=true;
                clientID=cID;
                crClassName=crCN;
  +             spyXAResourceManager = new SpyXAResourceManager(this);
        }
   
        // Public --------------------------------------------------------
  @@ -537,20 +540,8 @@
                        failureHandler(e,"Cannot acknowlege a message.");
                }               
        }       
  -     
  -     // Used to commit/rollback a transaction.
  -     protected void send(Transaction transaction) throws JMSException  {
  -             
  -             try {
  -                     provider.transact(distributedConnection, transaction);
  -             } catch (Exception e) {
  -                     failureHandler(e,"Cannot process a transaction.");
  -             }
  -             
  -     }       
  -     
  +                     
        //Send a message to the provider
  -     //[We should try to locally dispatch the message...]
        void sendToServer(SpyMessage mes) throws JMSException
        {
                if (closed) throw new IllegalStateException("The connection is 
closed");                
  @@ -565,4 +556,14 @@
                }
        }
   
  +     // Used to commit/rollback a transaction.
  +     protected void send(TransactionRequest transaction) throws JMSException  {
  +             
  +             try {
  +                     provider.transact(distributedConnection, transaction);
  +             } catch (Exception e) {
  +                     failureHandler(e,"Cannot process a transaction.");
  +             }
  +             
  +     }
   }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyXAQueueConnection.java
  
  Index: SpyXAQueueConnection.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.JMSException;
  import javax.jms.QueueSession;
  import javax.jms.XAQueueSession;
  import javax.jms.XAQueueConnection;
  
  import java.io.Serializable;
  
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  
  /**
   *    This class implements javax.jms.XAQueueConnection
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyXAQueueConnection 
        extends SpyQueueConnection 
        implements Serializable, XAQueueConnection 
  {
        
        // Constructor ---------------------------------------------------
        public SpyXAQueueConnection(DistributedJMSServer theServer,String cID,String 
crCN) throws JMSException
        { 
                super(theServer,cID,crCN);
        } 
  
        // Public --------------------------------------------------------
        
        public QueueSession createQueueSession(boolean transacted, int 
acknowledgeMode) throws JMSException
        {               
                return (QueueSession)createXAQueueSession();
        }
  
        
        /**
         * createXAQueueSession method comment.
         */
        public javax.jms.XAQueueSession createXAQueueSession() throws 
javax.jms.JMSException {
        
                if (closed) throw new IllegalStateException("The connection is 
closed");
                if (distributedConnection==null) createReceiver();
                                
                XAQueueSession session=new SpyQueueSession(this,true,0,modeStop,true);
                
                //add the new session to the createdSessions list 
                synchronized (createdSessions) {
                        createdSessions.add(session);
                }
                
                return session;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyXAQueueConnectionFactory.java
  
  Index: SpyXAQueueConnectionFactory.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.QueueConnection;
  import javax.jms.XAQueueConnection;
  import javax.jms.XAQueueConnectionFactory;
  import javax.jms.JMSException;
  
  import org.spydermq.Log;
  import org.spydermq.security.SecurityManager;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
  
  import java.io.Serializable;
  import java.util.Properties;
  
  
  /**
   *    This class implements javax.jms.XAQueueConnectionFactory
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyXAQueueConnectionFactory 
        implements XAQueueConnectionFactory, Serializable
  { 
        // Attributes ----------------------------------------------------
  
        private DistributedConnectionFactory factory;
  
        // Constructor ---------------------------------------------------
           
        public SpyXAQueueConnectionFactory( DistributedConnectionFactory factory) 
throws Exception
        {               
                this.factory = factory;
        }
  
        // Public --------------------------------------------------------
  
        public QueueConnection createQueueConnection() throws JMSException
        { 
                return createXAQueueConnection();
        }
                
        public QueueConnection createQueueConnection(String userName, String password) 
throws JMSException
        {
                return createXAQueueConnection(userName, password);
        }
                
        //private
        
        private void failureHandler(Exception e,String reason) throws JMSException
        {
                Log.error(e);
                throw new JMSException(reason);
        }
        
        /**
         * createXAQueueConnection method comment.
         */
        public javax.jms.XAQueueConnection createXAQueueConnection() throws 
javax.jms.JMSException {
                try {
                        return factory.createXAQueueConnection();
                } catch (JMSException e) {
                        throw e;
                } catch (Exception e) {
                        failureHandler(e,"createQueueConnection has failed !");
                        return null;
                }
        }
  
        /**
         * createXAQueueConnection method comment.
         */
        public javax.jms.XAQueueConnection createXAQueueConnection(String userName, 
String password) throws javax.jms.JMSException {
                try {
                        if (userName==null||password==null) throw new 
JMSException("Invalid login or password !");
                        return factory.createXAQueueConnection(userName,password);
                } catch (JMSException e) {
                        throw e;
                } catch (Exception e) {
                        failureHandler(e,"createQueueConnection has failed !");
                        return null;
                }
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyXAResource.java
  
  Index: SpyXAResource.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.JMSException;
  import javax.transaction.xa.XAResource;
  import javax.transaction.xa.XAException;
  import javax.transaction.xa.Xid;
  
  /**
   *    This class implements the XAResouece interface for used with an XASession.
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyXAResource implements XAResource {
        
        //////////////////////////////////////////////////////////////////
        // Attributes
        //////////////////////////////////////////////////////////////////
  
        SpySession session;
  
        //////////////////////////////////////////////////////////////////
        // Constructors
        //////////////////////////////////////////////////////////////////
  
        SpyXAResource(SpySession session) {
                this.session = session;
        }
  
        //////////////////////////////////////////////////////////////////
        // Public Methods
        //////////////////////////////////////////////////////////////////
        
        /**
         * commit method comment.
         */
        public void commit(javax.transaction.xa.Xid xid, boolean onePhase) throws 
javax.transaction.xa.XAException {
                try {
                        session.connection.spyXAResourceManager.commit(xid, onePhase);
                } catch (JMSException e) {
                        throw new XAException(XAException.XAER_RMERR);
                }
        }
        
        /**
         * end method comment.
         */
        public void end(javax.transaction.xa.Xid xid, int flags) throws 
javax.transaction.xa.XAException {
                if (session.currentTransactionId == null) {
                        throw new XAException(XAException.XAER_OUTSIDE);
                }
                switch (flags) {
                        case TMSUSPEND :
                                session.currentTransactionId = null;
                                session.connection.spyXAResourceManager.suspendTx(xid);
                                break;
                        case TMFAIL :
                                session.currentTransactionId = null;
                                session.connection.spyXAResourceManager.endTx(xid, 
false);
                                break;
                        case TMSUCCESS :
                                session.currentTransactionId = null;
                                session.connection.spyXAResourceManager.endTx(xid, 
true);
                                break;
                }
                session.notifyStopMode();
        }
        
        /**
         * forget method comment.
         */
        public void forget(javax.transaction.xa.Xid arg1) throws 
javax.transaction.xa.XAException {
        }
        
        /**
         * getTransactionTimeout method comment.
         */
        public int getTransactionTimeout() throws javax.transaction.xa.XAException {
                return 0;
        }
        
        /**
         * isSameRM method comment.
         */
        public boolean isSameRM(javax.transaction.xa.XAResource arg1) throws 
javax.transaction.xa.XAException {
                if (!(arg1 instanceof SpyXAResource))
                        return false;
                return ((SpyXAResource) arg1).session.connection.spyXAResourceManager 
== session.connection.spyXAResourceManager;
        }
        
        /**
         * prepare method comment.
         */
        public int prepare(javax.transaction.xa.Xid xid) throws 
javax.transaction.xa.XAException {
                try {
                        return session.connection.spyXAResourceManager.prepare(xid);
                } catch (JMSException e) {
                        throw new XAException(XAException.XAER_RMERR);
                }
        }
        
        /**
         * recover method comment.
         */
        public Xid[] recover(int arg1) throws javax.transaction.xa.XAException {
                return new Xid[0];
        }
        
        /**
         * rollback method comment.
         */
        public void rollback(javax.transaction.xa.Xid xid) throws 
javax.transaction.xa.XAException {
                try {
                        session.connection.spyXAResourceManager.rollback(xid);
                } catch (JMSException e) {
                        throw new XAException(XAException.XAER_RMERR);
                }
        }
        
        /**
         * setTransactionTimeout method comment.
         */
        public boolean setTransactionTimeout(int arg1) throws 
javax.transaction.xa.XAException {
                return false;
        }
        
        /**
         * start method comment.
         */
        public void start(javax.transaction.xa.Xid xid, int flags) throws 
javax.transaction.xa.XAException {
                if (session.currentTransactionId != null) {
                        throw new XAException(XAException.XAER_OUTSIDE);
                }
                switch (flags) {
                        case TMNOFLAGS :
                                session.currentTransactionId = 
session.connection.spyXAResourceManager.startTx(xid);
                                break;
                        case TMJOIN :
                                session.currentTransactionId = 
session.connection.spyXAResourceManager.joinTx(xid);
                                break;
                        case TMRESUME :
                                session.currentTransactionId = 
session.connection.spyXAResourceManager.resumeTx(xid);
                                break;
                }
                session.notifyStopMode();
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyXAResourceManager.java
  
  Index: SpyXAResourceManager.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import java.util.HashMap;
  import java.util.LinkedList;
  
  import javax.transaction.xa.Xid;
  import javax.transaction.xa.XAException;
  import javax.jms.JMSException;
  
  /**
   *    This class implements the ResourceManager used for the XAResources
   *  used int spyderMQ.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyXAResourceManager implements java.io.Serializable {
  
        //////////////////////////////////////////////////////////////////
        // Attributes
        //////////////////////////////////////////////////////////////////
        
        SpyConnection connection;
        HashMap transactions = new HashMap();
        long nextInternalXid = Long.MIN_VALUE;
        
        //Valid tx states:
        private static final byte TX_OPEN = 0;
        private static final byte TX_SUSPENDED = 1;
        private static final byte TX_PREPARED = 2;
        private static final byte TX_COMMITED = 3;
        private static final byte TX_ROLLEDBACK = 4;
        
        //////////////////////////////////////////////////////////////////
        // Helper Inner classes
        //////////////////////////////////////////////////////////////////
        
        class TXState {
                byte txState = TX_OPEN;
                LinkedList sentMessages = new LinkedList();
                LinkedList ackedMessages = new LinkedList();
        }
        
        //////////////////////////////////////////////////////////////////
        // Constructors
        //////////////////////////////////////////////////////////////////
        
        public SpyXAResourceManager(SpyConnection conn) {
                super();
                connection = conn;
        }
        
        //////////////////////////////////////////////////////////////////
        // Public Methods
        //////////////////////////////////////////////////////////////////
        
        public void ackMessage(Object xid, SpyMessage msg) throws JMSException {
                TXState state = (TXState) transactions.get(xid);
                if (state == null)
                        throw new JMSException("Invalid transaction id.");
                SpyAcknowledgementItem item = new SpyAcknowledgementItem();
                item.jmsDestination = msg.getJMSDestination();
                item.jmsMessageID = msg.getJMSMessageID();
                item.isAck = true;
                state.ackedMessages.addLast(item);
        }
  
        public void addMessage(Object xid, SpyMessage msg) throws JMSException {
                TXState state = (TXState) transactions.get(xid);
                if (state == null)
                        throw new JMSException("Invalid transaction id.");
                state.sentMessages.addLast(msg);
        }
        
        public void commit(Object xid, boolean onePhase) throws XAException, 
JMSException {
                TXState state = (TXState) transactions.get(xid);
                if (state == null)
                        throw new XAException(XAException.XAER_NOTA);
                if (onePhase) {
                        TransactionRequest transaction = new TransactionRequest();
                        transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
                        transaction.xid = null;
                        if (state.sentMessages.size() != 0) {
                                SpyMessage job[] = new 
SpyMessage[state.sentMessages.size()];
                                job = (SpyMessage[]) state.sentMessages.toArray(job);
                                transaction.messages = job;
                        }
                        if (state.ackedMessages.size() != 0) {
                                SpyAcknowledgementItem job[] = new 
SpyAcknowledgementItem[state.ackedMessages.size()];
                                job = (SpyAcknowledgementItem[]) 
state.ackedMessages.toArray(job);
                                transaction.acks = job;
                        }
                        connection.send(transaction);
                } else {
                        if (state.txState != TX_PREPARED)
                                throw new XAException("The transaction had not been 
prepared");
                        TransactionRequest transaction = new TransactionRequest();
                        transaction.xid = xid;
                        transaction.requestType = 
transaction.TWO_PHASE_COMMIT_COMMIT_REQUEST;
                        connection.send(transaction);
                }
                state.txState = TX_COMMITED;
        }
        
        public void endTx(Object xid, boolean success) throws XAException {
                Object state = transactions.remove(xid);
                if (state == null)
                        throw new XAException(XAException.XAER_NOTA);
        }
        
        public Object joinTx(Xid xid) throws XAException {
                if (!transactions.containsKey(xid))
                        throw new XAException(XAException.XAER_NOTA);
                return xid;
        }
        
        public int prepare(Object xid) throws XAException, JMSException {
                TXState state = (TXState) transactions.get(xid);
                if (state == null)
                        throw new XAException(XAException.XAER_NOTA);
                TransactionRequest transaction = new TransactionRequest();
                transaction.requestType = transaction.TWO_PHASE_COMMIT_PREPARE_REQUEST;
                transaction.xid = xid;
                if (state.sentMessages.size() != 0) {
                        SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
                        job = (SpyMessage[]) state.sentMessages.toArray(job);
                        transaction.messages = job;
                }
                if (state.ackedMessages.size() != 0) {
                        SpyAcknowledgementItem job[] = new 
SpyAcknowledgementItem[state.ackedMessages.size()];
                        job = (SpyAcknowledgementItem[]) 
state.ackedMessages.toArray(job);
                        transaction.acks = job;
                }
                connection.send(transaction);
                state.txState = TX_PREPARED;
                return javax.transaction.xa.XAResource.XA_OK;
        }
        
        public Object resumeTx(Xid xid) throws XAException {
                if (!transactions.containsKey(xid))
                        throw new XAException(XAException.XAER_NOTA);
                return xid;
        }
        
        public void rollback(Object xid) throws XAException, JMSException {
                TXState state = (TXState) transactions.get(xid);
                if (state == null)
                        throw new XAException(XAException.XAER_NOTA);
                if (state.txState != TX_PREPARED) {
                        TransactionRequest transaction = new TransactionRequest();
                        transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
                        transaction.xid = null;
                        if (state.ackedMessages.size() != 0) {
                                SpyAcknowledgementItem job[] = new 
SpyAcknowledgementItem[state.ackedMessages.size()];
                                job = (SpyAcknowledgementItem[]) 
state.ackedMessages.toArray(job);
                                transaction.acks = job;
                                //Neg Acknowlege all consumed messages
                                for (int i = 0; i < transaction.acks.length; i++) {
                                        transaction.acks[i].isAck = false;
                                }
                        }
                        connection.send(transaction);
                } else {
                        TransactionRequest transaction = new TransactionRequest();
                        transaction.xid = xid;
                        transaction.requestType = 
transaction.TWO_PHASE_COMMIT_ROLLBACK_REQUEST;
                        connection.send(transaction);
                }
                state.txState = TX_ROLLEDBACK;
        }
        
        public Object startTx() {
                Long newXid = new Long(nextInternalXid++);
                transactions.put(newXid, new TXState());
                return newXid;
        }
        
        public Object startTx(Xid xid) throws XAException {
                if (transactions.containsKey(xid))
                        throw new XAException(XAException.XAER_DUPID);
                transactions.put(xid, new TXState());
                return xid;
        }
        
        public Object suspendTx(Xid xid) throws XAException {
                if (!transactions.containsKey(xid))
                        throw new XAException(XAException.XAER_NOTA);
                return xid;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyXATopicConnection.java
  
  Index: SpyXATopicConnection.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.XATopicConnection;
  import javax.jms.XATopicSession;
  import javax.jms.TopicSession;
  import javax.jms.JMSException;
  
  import java.io.Serializable;
  
  /**
   *    This class implements the XATopicConnection
   *  
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyXATopicConnection extends SpyTopicConnection implements 
Serializable, XATopicConnection {
  
        //////////////////////////////////////////////////////////////////
        // Consustructors
        //////////////////////////////////////////////////////////////////
        
        /**
         * SpyXATopicConnection constructor comment.
         * @param theServer org.spydermq.distributed.interfaces.DistributedJMSServer
         * @param cID java.lang.String
         * @param crCN java.lang.String
         * @exception javax.jms.JMSException The exception description.
         */
        public 
SpyXATopicConnection(org.spydermq.distributed.interfaces.DistributedJMSServer 
theServer, String cID, String crCN) throws javax.jms.JMSException {
                super(theServer, cID, crCN);
        }
        
        //////////////////////////////////////////////////////////////////
        // Public Methods
        //////////////////////////////////////////////////////////////////
  
        public TopicSession createTopicSession(boolean transacted, int 
acknowledgeMode) throws JMSException {
                return (TopicSession) createXATopicSession();
        }
        
        public XATopicSession createXATopicSession() throws javax.jms.JMSException {
                if (closed)
                        throw new IllegalStateException("The connection is closed");
                if (distributedConnection == null)
                        createReceiver();
                XATopicSession session = new SpyTopicSession(this, true, 0, modeStop, 
true);
                //add the new session to the createdSessions list 
                synchronized (createdSessions) {
                        createdSessions.add(session);
                }
                return session;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/SpyXATopicConnectionFactory.java
  
  Index: SpyXATopicConnectionFactory.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.TopicConnection;
  import javax.jms.TopicConnectionFactory;
  import javax.jms.XATopicConnectionFactory;
  import javax.jms.XATopicConnection;
  import javax.jms.JMSException;
  
  import org.spydermq.Log;
  import org.spydermq.security.SecurityManager;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
  
  import java.io.Serializable;
  import java.util.Properties;
  
  
  /**
   *    This class implements javax.jms.XATopicConnectionFactory
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class SpyXATopicConnectionFactory 
        implements XATopicConnectionFactory, Serializable
  { 
        //////////////////////////////////////////////////////////////////
        // Attributes
        //////////////////////////////////////////////////////////////////
  
        private DistributedConnectionFactory factory;
  
        //////////////////////////////////////////////////////////////////
        // Consustructors
        //////////////////////////////////////////////////////////////////
           
        public SpyXATopicConnectionFactory(DistributedConnectionFactory factory) 
throws Exception
        {               
                this.factory = factory;
        }
  
        //////////////////////////////////////////////////////////////////
        // Public Methods
        //////////////////////////////////////////////////////////////////
  
        public TopicConnection createTopicConnection() throws JMSException
        {
                return createXATopicConnection();
        }
                
        public TopicConnection createTopicConnection(String userName, String password) 
throws JMSException {
                return createXATopicConnection(userName, password);
        }
        
        public XATopicConnection createXATopicConnection() throws JMSException {
                try {
                        return factory.createXATopicConnection();
                } catch (JMSException e) {
                        throw e;
                } catch (Exception e) {
                        failureHandler(e,"createTopicConnection has failed !");
                        return null;
                }
        }
  
        public XATopicConnection createXATopicConnection(String userName, String 
password) throws JMSException {
                try {
                        if (userName==null||password==null) throw new 
JMSException("Invalid login or password !");                      
                        return factory.createXATopicConnection(userName,password);
                } catch (JMSException e) {
                        throw e;
                } catch (Exception e) {
                        failureHandler(e,"createTopicConnection has failed !");
                        return null;
                }
        }
        
        //////////////////////////////////////////////////////////////////
        // Private Methods
        //////////////////////////////////////////////////////////////////
        
        private void failureHandler(Exception e,String reason) throws JMSException
        {
                Log.error(e);
                throw new JMSException(reason);
        }
  
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/TransactionRequest.java
  
  Index: TransactionRequest.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import java.io.Serializable;
  
  /**
   *    This class contians all the data needed to perform a JMS transaction
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class TransactionRequest 
        implements Serializable
  {
        //Valid requests types
        public static final byte ONE_PHASE_COMMIT_REQUEST=0;
        public static final byte TWO_PHASE_COMMIT_COMMIT_REQUEST=2;
        public static final byte TWO_PHASE_COMMIT_PREPARE_REQUEST=1;
        public static final byte TWO_PHASE_COMMIT_ROLLBACK_REQUEST=3;
  
        //Request type
        public byte requestType=ONE_PHASE_COMMIT_REQUEST;
  
        //For 2 phase commit, this identifies the transaction.
        public Object xid;
  
        // messages sent in the transaction
        public SpyMessage[] messages;
        
        // messages acknowleged in the transaction
        public SpyAcknowledgementItem[] acks;   
  
  }
  
  
  

Reply via email to