User: hiram   
  Date: 00/12/18 22:43:37

  Modified:    src/java/org/spydermq/server StartServer.java
                        PersistenceManager.java JMSServerQueueReceiver.java
                        JMSServer.java InvocationLayerFactory.java
  Log:
  Add XA support!  Well.. I haven't tested very much but it's a start.
  
  Revision  Changes    Path
  1.5       +6 -2      spyderMQ/src/java/org/spydermq/server/StartServer.java
  
  Index: StartServer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- StartServer.java  2000/12/16 03:27:50     1.4
  +++ StartServer.java  2000/12/19 06:43:35     1.5
  @@ -48,7 +48,7 @@
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class StartServer implements Runnable
   {
  @@ -221,6 +221,8 @@
                                String name = element.getField("Name");
                                String topicConnectionFactoryJNDI = 
element.getField("TopicConnectionFactoryJNDI");
                                String queueConnectionFactoryJNDI = 
element.getField("QueueConnectionFactoryJNDI");
  +                             String xaTopicConnectionFactoryJNDI = 
element.getField("XATopicConnectionFactoryJNDI");
  +                             String xaQueueConnectionFactoryJNDI = 
element.getField("XAQueueConnectionFactoryJNDI");
   
                                //Set up the transports for the server
                                InvocationLayerFactory invocationLayerFactory= new 
InvocationLayerFactory();
  @@ -238,7 +240,9 @@
   
                                //(re)bind the connection factories in the JNDI 
namespace
                                
ctx.rebind(topicConnectionFactoryJNDI,invocationLayerFactory.spyTopicConnectionFactory);
  -                             
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
  +                             
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
                                
  +                             
ctx.rebind(xaTopicConnectionFactoryJNDI,invocationLayerFactory.spyXATopicConnectionFactory);
  +                             
ctx.rebind(xaQueueConnectionFactoryJNDI,invocationLayerFactory.spyXAQueueConnectionFactory);
                                
                        }
   
  
  
  
  1.2       +99 -11    spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- PersistenceManager.java   2000/12/16 03:27:50     1.1
  +++ PersistenceManager.java   2000/12/19 06:43:36     1.2
  @@ -20,12 +20,14 @@
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyMessage;
   
  +import org.spydermq.SpyDistributedConnection;
  +
   /**
    *   This class manages all persistence related services.
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class PersistenceManager {
   
  @@ -103,29 +105,31 @@
        public void commitTx(Long txId) throws javax.jms.JMSException {
   
                LinkedList tasks;
  -             synchronized (postCommitTasks) {
  -                     tasks = (LinkedList) postCommitTasks.remove(txId);
  +             synchronized( postCommitTasks ) {
  +                     tasks = (LinkedList)postCommitTasks.remove(txId);
  +                     postRollbackTasks.remove(txId);
                }
  -             if (tasks == null)
  +             if( tasks == null )
                        throw new javax.jms.JMSException("Transaction is not active.");
   
                spyTxLog.commitTx(txId);
  -
  +             
                synchronized (tasks) {
                        Iterator iter = tasks.iterator();
  -                     while (iter.hasNext()) {
  -                             Runnable task = (Runnable) iter.next();
  +                     while( iter.hasNext() ) {
  +                             Runnable task = (Runnable)iter.next();
                                task.run();
                        }
                }
  -
  +                     
        }
   
        
        public Long createTx() throws javax.jms.JMSException {
                Long txId = spyTxLog.createTx();
                synchronized (postCommitTasks) {
  -                     postCommitTasks.put(txId, new LinkedList());
  +                     postCommitTasks.put(txId, new LinkedList());            
  +                     postRollbackTasks.put(txId, new LinkedList());          
                }
                return txId;
        }
  @@ -198,13 +202,97 @@
        public void rollbackTx(Long txId) throws javax.jms.JMSException {
   
                LinkedList tasks;
  -             synchronized (postCommitTasks) {
  -                     tasks = (LinkedList) postCommitTasks.remove(txId);
  +             synchronized( postCommitTasks ) {
  +                     tasks = (LinkedList)postRollbackTasks.remove(txId);
  +                     postCommitTasks.remove(txId);
                }
  -             if (tasks == null)
  +             if( tasks == null )
                        throw new javax.jms.JMSException("Transaction is not active.");
   
                spyTxLog.rollbackTx(txId);
   
  +             synchronized (tasks) {
  +                     Iterator iter = tasks.iterator();
  +                     while( iter.hasNext() ) {
  +                             Runnable task = (Runnable)iter.next();
  +                             task.run();
  +                     }
  +             }
  +                     
  +     }
  +
  +     // Maps Global transactions to local transactions
  +     HashMap globalToLocal = new HashMap();
  +     // Maps (Long)txIds to LinkedList of Runnable tasks
  +     HashMap postRollbackTasks = new HashMap();
  +
  +     class GlobalXID implements Runnable {
  +             SpyDistributedConnection dc;
  +             Object xid;
  +
  +             GlobalXID(SpyDistributedConnection dc,Object xid) {
  +                     this.dc = dc;
  +                     this.xid = xid;
  +             }
  +             
  +             public boolean equals(Object obj)
  +             {
  +                     if (obj==null) return false;                    
  +                     if (obj.getClass()!=GlobalXID.class) return false;
  +                     return ((GlobalXID)obj).xid.equals( xid ) &&
  +                                ((GlobalXID)obj).dc.equals(dc);
  +             }
  +             
  +             public int hashCode() {
  +                     return xid.hashCode();
  +             }
  +
  +             public void run() {
  +                     synchronized (globalToLocal) {
  +                             globalToLocal.remove(this);                     
  +                     }
  +             }
  +     }
  +
  +     public void addPostRollbackTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
  +
  +             LinkedList tasks;
  +             synchronized( postRollbackTasks ) {
  +                     tasks = (LinkedList)postRollbackTasks.get(txId);
  +             }
  +             if( tasks == null )
  +                     throw new javax.jms.JMSException("Transaction is not active.");
  +
  +             synchronized (tasks) {
  +                     tasks.addLast(task);
  +             }
  +                     
  +     }
  +
  +     public Long createTx(SpyDistributedConnection dc, Object xid) throws 
javax.jms.JMSException {
  +
  +             GlobalXID gxid = new GlobalXID(dc, xid);
  +             if( globalToLocal.containsValue(gxid) ) 
  +                     throw new JMSException("Duplicate transaction from: 
"+dc.getClientID()+" xid="+xid);
  +                     
  +             Long txId = createTx();
  +             globalToLocal.put(gxid, txId);
  +
  +             //Tasks to remove the global to local mappings on commit/rollback
  +             addPostCommitTask(txId, gxid);
  +             addPostRollbackTask(txId, gxid);
  +             
  +             return txId;
  +     }
  +
  +     public Long getPrepared(SpyDistributedConnection dc, Object xid) throws 
javax.jms.JMSException {
  +             
  +             GlobalXID gxid = new GlobalXID(dc, xid);
  +             Long txid = (Long)globalToLocal.get(gxid);
  +             
  +             if( txid == null )              
  +                     throw new JMSException("Transaction does not exist from: 
"+dc.getClientID()+" xid="+xid);
  +                     
  +             return txid;
        }
   }
  
  
  
  1.3       +9 -2      
spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java
  
  Index: JMSServerQueueReceiver.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JMSServerQueueReceiver.java       2000/12/16 03:27:50     1.2
  +++ JMSServerQueueReceiver.java       2000/12/19 06:43:36     1.3
  @@ -22,7 +22,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class JMSServerQueueReceiver implements Serializable {
   
  @@ -186,7 +186,14 @@
                                
                                if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 
) {
                                        
jmsSeverQueue.server.persistenceManager.remove(jmsSeverQueue.destination, m, txId);
  -                             }                               
  +                             }
  +                             
  +                             // We have to restore the message on a rollback if 
transacted
  +                             if( txId != null ) {
  +                                     Runnable task = new RestoreMessageTask(m);
  +                                     
jmsSeverQueue.server.persistenceManager.addPostRollbackTask(txId, task);
  +                             }
  +                                     
                                Log.log("Message Ack: " + m.messageId);
                        }
                }
  
  
  
  1.3       +67 -31    spyderMQ/src/java/org/spydermq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JMSServer.java    2000/12/16 03:27:50     1.2
  +++ JMSServer.java    2000/12/19 06:43:36     1.3
  @@ -26,7 +26,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
  @@ -372,37 +372,8 @@
                queue.addSubscriber(dc);
        }               
        
  -     /**
  -      * The following function performs a Unit Of Work.
  -      *
  -      */
  -     public void transact(SpyDistributedConnection dc, Transaction t) throws 
JMSException {
  -
  -             Long txId = persistenceManager.createTx();
  -
  -             try {
  -                     
  -                     if( t.messages != null ) {
  -                             for( int i=0; i < t.messages.length; i++ ) {
  -                                     addMessage(dc, t.messages[i], txId);           
                         
  -                             }
  -                     }
  -
  -                     if( t.acks != null ) {
  -                             for( int i=0; i < t.acks.length; i++ ) {
  -                                     acknowledge(dc, t.acks[i], txId);
  -                             }
  -                     }
  -
  -                     persistenceManager.commitTx(txId);
  -                     
  -             } catch ( JMSException e ) {
  -                     persistenceManager.rollbackTx(txId);
  -                     throw e;
  -             }
  -
  -     }       
        
  +     
        public void unsubscribe(SpyDistributedConnection dc,Destination dest) throws 
JMSException
        {
                Log.log("Server: 
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
  @@ -439,4 +410,69 @@
                return queue;   
        }
        
  +     /**
  +      * The following function performs a Unit Of Work.
  +      *
  +      */
  +     public void transact(SpyDistributedConnection dc, TransactionRequest t) throws 
JMSException {
  +
  +             
  +             if ( t.requestType == t.ONE_PHASE_COMMIT_REQUEST ) {
  +
  +                     Long txId = persistenceManager.createTx();
  +
  +                     try {
  +                             
  +                             if( t.messages != null ) {
  +                                     for( int i=0; i < t.messages.length; i++ ) {
  +                                             addMessage(dc, t.messages[i], txId);   
                                 
  +                                     }
  +                             }
  +
  +                             if( t.acks != null ) {
  +                                     for( int i=0; i < t.acks.length; i++ ) {
  +                                             acknowledge(dc, t.acks[i], txId);
  +                                     }
  +                             }
  +
  +                             persistenceManager.commitTx(txId);
  +                             
  +                     } catch ( JMSException e ) {
  +                             persistenceManager.rollbackTx(txId);
  +                             throw e;
  +                     }
  +             } else if ( t.requestType == t.TWO_PHASE_COMMIT_PREPARE_REQUEST) {
  +                     
  +                     Long txId = persistenceManager.createTx(dc, t.xid);
  +                     try {
  +                             
  +                             if( t.messages != null ) {
  +                                     for( int i=0; i < t.messages.length; i++ ) {
  +                                             addMessage(dc, t.messages[i], txId);   
                                 
  +                                     }
  +                             }
  +
  +                             if( t.acks != null ) {
  +                                     for( int i=0; i < t.acks.length; i++ ) {
  +                                             acknowledge(dc, t.acks[i], txId);
  +                                     }
  +                             }
  +                             
  +                     } catch ( JMSException e ) {
  +                             persistenceManager.rollbackTx(txId);
  +                             throw e;
  +                     }
  +             } else if ( t.requestType == t.TWO_PHASE_COMMIT_ROLLBACK_REQUEST ) {
  +
  +                     Long txId = persistenceManager.getPrepared(dc, t.xid);         
                 
  +                     persistenceManager.rollbackTx(txId);
  +
  +             } else if ( t.requestType == t.TWO_PHASE_COMMIT_COMMIT_REQUEST ) {
  +                     
  +                     Long txId = persistenceManager.getPrepared(dc, t.xid);         
                 
  +                     persistenceManager.commitTx(txId);
  +                     
  +             }
  +
  +     }
   }
  
  
  
  1.2       +7 -0      
spyderMQ/src/java/org/spydermq/server/InvocationLayerFactory.java
  
  Index: InvocationLayerFactory.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/InvocationLayerFactory.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- InvocationLayerFactory.java       2000/12/12 05:58:44     1.1
  +++ InvocationLayerFactory.java       2000/12/19 06:43:36     1.2
  @@ -22,6 +22,9 @@
   import java.rmi.server.UnicastRemoteObject;
   import java.rmi.Remote;
   
  +import org.spydermq.SpyXAQueueConnectionFactory;
  +import org.spydermq.SpyXATopicConnectionFactory;
  +
   public class InvocationLayerFactory
   {
   
  @@ -57,6 +60,10 @@
                //Create the Topic and Queue Connection Factory objects
                spyTopicConnectionFactory = new 
SpyTopicConnectionFactory(distributedConnectionFactory);
                spyQueueConnectionFactory = new 
SpyQueueConnectionFactory(distributedConnectionFactory);
  +             spyXATopicConnectionFactory = new 
SpyXATopicConnectionFactory(distributedConnectionFactory);
  +             spyXAQueueConnectionFactory = new 
SpyXAQueueConnectionFactory(distributedConnectionFactory);
        }
        
  +     SpyXAQueueConnectionFactory spyXAQueueConnectionFactory;
  +     SpyXATopicConnectionFactory spyXATopicConnectionFactory;
   }
  
  
  

Reply via email to