User: hiram   
  Date: 00/12/15 19:27:51

  Modified:    src/java/org/spydermq/server JMSServer.java
                        JMSServerQueue.java JMSServerQueueReceiver.java
                        StartServer.java
  Added:       src/java/org/spydermq/server PersistenceManager.java
  Log:
  Better persistence and Transactions!
  Transactions work like the should, all operations get done, or none at all
  (It should even be rolling back persistent messages that were logged that were part
  of a transaction that did not complete due to abnormal server termination).
  You can also configure in what directory you would like to
  store the persistence data at.
  
  Revision  Changes    Path
  1.2       +66 -37    spyderMQ/src/java/org/spydermq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSServer.java    2000/12/12 05:58:46     1.1
  +++ JMSServer.java    2000/12/16 03:27:50     1.2
  @@ -26,20 +26,17 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
   {
  -     
  -     // Constants -----------------------------------------------------
  -     
  -     //number of threads in the pool (TO DO: this value should be dynamic)
  -     final int NB_THREADS=1;
  -     public static final String OBJECT_NAME = "JMS:service=JMSServer";
   
  -     // Attributes ----------------------------------------------------
  -   
  +     /////////////////////////////////////////////////////////////////////
  +     // Attributes
  +     /////////////////////////////////////////////////////////////////////
  +     public static final String OBJECT_NAME = "JMS:service=JMSServer";
  +     final int NB_THREADS=1;
        //messages pending for a Destination ( HashMap of JMSServerQueue objects )
        public HashMap messageQueue;
        //list of tasks pending ( linked list of JMSServerQueue objects )
  @@ -52,7 +49,9 @@
        private int lastTemporaryQueue; 
        //The security manager
        SecurityManager securityManager;
  -
  +     //The persistence manager
  +     PersistenceManager persistenceManager;
  +     
        /**
         * <code>true</code> when the server is running.  <code>false</code> when the 
         * server should stop running.
  @@ -66,8 +65,9 @@
         */
        private boolean stopped = true;
   
  -     // Constructor ---------------------------------------------------
  -   
  +     /////////////////////////////////////////////////////////////////////
  +     // Constructors
  +     /////////////////////////////////////////////////////////////////////   
        public JMSServer(SecurityManager securityManager)
        {
   
  @@ -88,6 +88,11 @@
                
        }
   
  +
  +     /////////////////////////////////////////////////////////////////////
  +     // Public Methods
  +     /////////////////////////////////////////////////////////////////////
  +     
        /**
         * Returns <code>false</code> if the JMS server is currently
         * running and handling requests, <code>true</code> otherwise.
  @@ -100,11 +105,8 @@
                return this.stopped;
        }
   
  -     // Public --------------------------------------------------------
  -
        //This is a correct threading system, but this is not ideal... 
  -     //We should let threads cycle through the JMSServerQueue list, and 
synchronized on the queue they are working on.
  -     
  +     //We should let threads cycle through the JMSServerQueue list, and 
synchronized on the queue they are working on.       
        public void run() {
                        while (alive) {
                                JMSServerQueue queue = null;                    
  @@ -326,24 +328,15 @@
   
        //Sent by a client to Ack or Nack a message.
        public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws JMSException
  -     {               
  -             JMSServerQueue 
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
  -             if (serverQueue==null) throw new JMSException("Destination does not 
exist: "+item.jmsDestination);
  -     
  -             serverQueue.acknowledge(dc, item.jmsMessageID, item.isAck);
  +     {
  +             acknowledge(dc, item, null);
        }
        
        
        //A connection has sent a new message
        public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws 
JMSException 
        {
  -     
  -             Log.notice("INCOMING: "+dc.getClientID()+" => "+val.jmsDestination);   
 
  -             JMSServerQueue 
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
  -             if (queue==null) throw new JMSException("This destination does not 
exist !");   
  -             //Add the message to the queue
  -             queue.addMessage(val);
  -             
  +             addMessage( dc, val, null);             
        }       
        
        public void connectionListening(SpyDistributedConnection dc,boolean 
mode,Destination dest) throws JMSException
  @@ -380,24 +373,33 @@
        }               
        
        /**
  -      * TODO: The following function has to be performed as a Unit Of Work.
  +      * The following function performs a Unit Of Work.
         *
  -      * for now we just do a quick hack to make it work most of the time.
         */
        public void transact(SpyDistributedConnection dc, Transaction t) throws 
JMSException {
   
  -             if( t.messages != null ) {
  -                     for( int i=0; i < t.messages.length; i++ ) {
  -                             addMessage(dc, t.messages[i]);
  +             Long txId = persistenceManager.createTx();
  +
  +             try {
  +                     
  +                     if( t.messages != null ) {
  +                             for( int i=0; i < t.messages.length; i++ ) {
  +                                     addMessage(dc, t.messages[i], txId);           
                         
  +                             }
                        }
  -             }
   
  -             if( t.acks != null ) {
  -                     for( int i=0; i < t.acks.length; i++ ) {
  -                             acknowledge(dc, t.acks[i]);
  +                     if( t.acks != null ) {
  +                             for( int i=0; i < t.acks.length; i++ ) {
  +                                     acknowledge(dc, t.acks[i], txId);
  +                             }
                        }
  +
  +                     persistenceManager.commitTx(txId);
  +                     
  +             } catch ( JMSException e ) {
  +                     persistenceManager.rollbackTx(txId);
  +                     throw e;
                }
  -             
   
        }       
        
  @@ -410,4 +412,31 @@
                queue.removeSubscriber(dc,null);
        }
   
  +     //Sent by a client to Ack or Nack a message.
  +     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item, Long txId) throws JMSException
  +     {               
  +             JMSServerQueue 
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
  +             if (serverQueue==null) throw new JMSException("Destination does not 
exist: "+item.jmsDestination);      
  +             serverQueue.acknowledge(dc, item, txId);
  +     }       
  +     
  +     //A connection has sent a new message
  +     public void addMessage(SpyDistributedConnection dc, SpyMessage val, Long txId) 
throws JMSException 
  +     {
  +     
  +             Log.notice("INCOMING: (TX="+txId+")"+dc.getClientID()+" => 
"+val.jmsDestination);       
  +             JMSServerQueue 
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
  +             if (queue==null) throw new JMSException("This destination does not 
exist !");   
  +             //Add the message to the queue
  +             queue.addMessage(val, txId);
  +             
  +     }
  +     
  +     public JMSServerQueue getServerQueue(SpyDestination d) throws JMSException 
  +     {               
  +             JMSServerQueue queue=(JMSServerQueue)messageQueue.get(d);
  +             if (queue==null) throw new JMSException("This destination does not 
exist !");
  +             return queue;   
  +     }
  +     
   }
  
  
  
  1.2       +63 -64    spyderMQ/src/java/org/spydermq/server/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServerQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSServerQueue.java       2000/12/12 05:58:45     1.1
  +++ JMSServerQueue.java       2000/12/16 03:27:50     1.2
  @@ -25,7 +25,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class JMSServerQueue {
        // Attributes ----------------------------------------------------
  @@ -44,7 +44,7 @@
        //If this is linked to a temporaryDestination, 
temporaryDestination=DistributedConnection of the owner, otherwise it's null
        SpyDistributedConnection temporaryDestination;
        //The JMSServer object
  -     private JMSServer server;
  +     JMSServer server;
        //Am I a queue or a topic  
        boolean isTopic;
   
  @@ -58,8 +58,7 @@
        // Should we use the round robin aproach to pick the next reciver of a p2p 
message?
        private boolean useRoundRobinMessageDistribution = true;
   
  -     // Used to log the persistent messages.
  -     SpyMessageLog spyMessageLog;
  +
        
        
        // Constructor ---------------------------------------------------         
  @@ -74,15 +73,10 @@
                this.server=server;
                isTopic=dest instanceof SpyTopic;
                listeners=0;
  -             
  -             spyMessageLog = new SpyMessageLog( 
dest.getName()+"-transaction-log.dat" );
  -             SpyMessage[] rebuild = spyMessageLog.rebuildMessagesFromLog();
  -             for( int i=0; i < rebuild.length; i++ ) {
  -                     restoreMessage( rebuild[i] );
  -                     messageIdCounter = Math.max( messageIdCounter, 
rebuild[i].messageId+1 );
  -             }
  -             Log.notice("Restored "+rebuild.length+" messages to "+dest.getName()+" 
from the transaction log");
                
  +             if( !isTopic )
  +                     server.persistenceManager.initQueue(dest);
  +                     
        }
   
   
  @@ -122,37 +116,8 @@
                }
        }
   
  -
  -     
  -     public void addMessage(SpyMessage mes) throws JMSException
  -     {
  -             //Add a message to the message list... 
  -             synchronized (messages) 
  -             {                       
  -                     //Add the message to the queue
  -                     //messages is now an ordered tree. The order depends 
  -                     //first on the priority and then on messageId
  -                     mes.messageId = messageIdCounter++;
  -
  -                     if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
  -                             spyMessageLog.logAddMessage(mes);
  -                             spyMessageLog.commit();
  -                     }
  -                     
  -                     messages.add(mes);
  -                     
  -                     if (isTopic) {
  -                             //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
  -                             if (!threadWorking) notifyWorkers();
  -                     } else {
  -                             if (listeners!=0&&!threadWorking) notifyWorkers();
  -                     }
  -
  -             }
  -     }
  -
   
  -     
  +             
        //Clear the message queue
        synchronized SpyMessage[] startWork()
        {
  @@ -295,16 +260,6 @@
                
                        //Clear the message queue
                        SpyMessage[] msgs=startWork();
  -
  -                     boolean msgRemoved=false;
  -                     for( int i=0; i < msgs.length; i++ ) {
  -                             if( msgs[i].getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) {
  -                                     spyMessageLog.logRemoveMessage(msgs[i]);
  -                                     msgRemoved=true;
  -                             }
  -                     }
  -                     if( msgRemoved )
  -                             spyMessageLog.commit();
        
                        //Let the thread do its work
                        if (msgs.length == 1) {
  @@ -344,8 +299,7 @@
                                        
                                        if (mes.isOutdated()) {
                                                if( mes.getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) {
  -                                                     
spyMessageLog.logRemoveMessage(mes);
  -                                                     spyMessageLog.commit();
  +                                                     
server.persistenceManager.remove(destination,mes, null);
                                                }
                                                continue;
                                        }
  @@ -419,18 +373,7 @@
                
                }               
        }       
  -
  -
        
  -     public void acknowledge(SpyDistributedConnection dc, String messageId, boolean 
isAck) throws JMSException       {
  -     
  -             JMSServerQueueReceiver qr = 
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
  -             if( qr==null )
  -                     throw new JMSException("You have not subscribed to this 
destination.");
  -                     
  -             qr.acknowledge(messageId, isAck);
  -             
  -     }       
   
        
                
  @@ -559,5 +502,61 @@
   
                }
        }       
  +     
  +     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item, Long txId) throws JMSException        {
  +     
  +             JMSServerQueueReceiver qr = 
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
  +             if( qr==null )
  +                     throw new JMSException("You have not subscribed to this 
destination.");
  +                     
  +             qr.acknowledge(item.jmsMessageID, item.isAck, txId);
  +             
  +     }       
  +     
  +     
  +     public void addMessage(SpyMessage mes, Long txId) throws JMSException
  +     {
  +
  +             // This task gets run to make the message visible in the queue.
  +             class AddMessagePostCommitTask implements Runnable {
  +                     SpyMessage message;
  +                     
  +                     AddMessagePostCommitTask(SpyMessage m) {
  +                             message = m;
  +                     }
  +                     
  +                     public void run() {
  +                             synchronized (messages) 
  +                             {
  +                                     //Add the message to the queue
  +                                     messages.add(message);                  
  +                                     if (isTopic) {
  +                                             //if a thread is already working on 
this destination, I don't have to myself to the taskqueue
  +                                             if (!threadWorking) notifyWorkers();
  +                                     } else {
  +                                             if (listeners!=0&&!threadWorking) 
notifyWorkers();
  +                                     }                                              
         
  +                             }                                       
  +                     }
  +             }
  +             
  +             //messages is now an ordered tree. The order depends 
  +             //first on the priority and then on messageId
  +             mes.messageId = messageIdCounter++;
  +
  +             //We are only persisting queues for now.
  +             if( !isTopic && mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) 
  +                     server.persistenceManager.add(destination, mes, txId);
  +
  +             // The message gets added to the queue after the transaction
  +             // commits (if the message was transacted)      
  +             Runnable task = new AddMessagePostCommitTask(mes);
  +             if( txId == null ) {
  +                     task.run();
  +             } else {
  +                     server.persistenceManager.addPostCommitTask(txId, task);
  +             }
  +             
  +     }
        
   }
  
  
  
  1.2       +48 -30    
spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java
  
  Index: JMSServerQueueReceiver.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSServerQueueReceiver.java       2000/12/12 05:58:43     1.1
  +++ JMSServerQueueReceiver.java       2000/12/16 03:27:50     1.2
  @@ -22,7 +22,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class JMSServerQueueReceiver implements Serializable {
   
  @@ -51,37 +51,7 @@
        }
   
        
  -     void acknowledge(String messageId, boolean ack) throws javax.jms.JMSException {
  -             SpyMessage m;
  -             synchronized (unacknowledgedMessages) {
  -                     m = (SpyMessage) unacknowledgedMessages.remove(messageId);
  -             }
   
  -             if (m == null)
  -                     return;
  -                     
  -             if (jmsSeverQueue.isTopic) {
  -                     // Not sure how we should handle the topic case.
  -                     // On a negative acknowledge, we don't want to
  -                     // add it back to the topic since other
  -                     // receivers might get a duplicate duplicate message.
  -             } else {                        
  -                     // Was it a negative acknowledge??
  -                     if (!ack) {
  -                             Log.log("Restoring message: " + m.messageId);
  -                             jmsSeverQueue.restoreMessage(m);
  -                     } else {
  -                             
  -                             if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 
) {
  -                                     
jmsSeverQueue.spyMessageLog.logRemoveMessage(m);
  -                                     jmsSeverQueue.spyMessageLog.commit();
  -                             }
  -                             
  -                             Log.log("Message Ack: " + m.messageId);
  -                     }
  -             }
  -
  -     }
   
   
        // The connection is accepting new messages if there
  @@ -174,4 +144,52 @@
                }
        }
        
  +     void acknowledge(String messageId, boolean ack, Long txId) throws 
javax.jms.JMSException {
  +
  +             // This task gets run to place the neg ack a messge (place it back on 
the queue)
  +             class RestoreMessageTask implements Runnable {
  +                     SpyMessage message;
  +                     
  +                     RestoreMessageTask(SpyMessage m) {
  +                             message = m;
  +                     }
  +                     
  +                     public void run() {
  +                             Log.log("Restoring message: " + message.messageId);
  +                             jmsSeverQueue.restoreMessage(message);                 
 
  +                     }
  +             }               
  +             
  +             SpyMessage m;
  +             synchronized (unacknowledgedMessages) {
  +                     m = (SpyMessage) unacknowledgedMessages.remove(messageId);
  +             }
  +
  +             if (m == null)
  +                     return;
  +                     
  +             if (jmsSeverQueue.isTopic) {
  +                     // Not sure how we should handle the topic case.
  +                     // On a negative acknowledge, we don't want to
  +                     // add it back to the topic since other
  +                     // receivers might get a duplicate message.
  +             } else {                        
  +                     // Was it a negative acknowledge??
  +                     if (!ack) {
  +                             Runnable task = new RestoreMessageTask(m);
  +                             if( txId == null )
  +                                     task.run();
  +                             else
  +                                     
jmsSeverQueue.server.persistenceManager.addPostCommitTask(txId, task);
  +                             
  +                     } else {
  +                             
  +                             if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 
) {
  +                                     
jmsSeverQueue.server.persistenceManager.remove(jmsSeverQueue.destination, m, txId);
  +                             }                               
  +                             Log.log("Message Ack: " + m.messageId);
  +                     }
  +             }
  +
  +     }
   }
  
  
  
  1.4       +17 -3     spyderMQ/src/java/org/spydermq/server/StartServer.java
  
  Index: StartServer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- StartServer.java  2000/12/13 00:34:06     1.3
  +++ StartServer.java  2000/12/16 03:27:50     1.4
  @@ -36,9 +36,11 @@
   import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
   import org.spydermq.SpyTopicConnectionFactory;
   import org.spydermq.xml.XElement;
  +import org.spydermq.persistence.SpyTxLog;
   
   /**
    *   Class used to start a JMS service.  This can be called from inside another
  + 
    *   application to start the JMS provider.
    *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  @@ -46,7 +48,7 @@
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class StartServer implements Runnable
   {
  @@ -164,6 +166,10 @@
   
                        //Create the JMSServer object
                        theServer = new JMSServer(securityManager);
  +
  +                     PersistenceManager persistenceManager = new 
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
  +                     theServer.persistenceManager = persistenceManager;
  +                     
                        registerService(theServer, new 
ObjectName(JMSServer.OBJECT_NAME));
   
                        //create the known topics
  @@ -175,7 +181,7 @@
                                String name = element.getField("Name");
   
                                Topic t=theServer.newTopic(name);
  -                             subcontext.rebind(name,t);                             
 
  +                             subcontext.rebind(name,t);
                        }
                        
   
  @@ -205,6 +211,9 @@
                                }
                        }
   
  +                     // Restore the persistent messages to thie queues.
  +                     theServer.persistenceManager.restore();
  +
                        iter = serverCfg.getElementsNamed("InvocationLayer");
                        while( iter.hasNext() ) {
                                
  @@ -236,8 +245,13 @@
   
                } catch (Exception e) {
                        System.err.println("Cannot start the JMS server ! 
"+e.getMessage());
  -                     System.err.println(e);
  +                     e.printStackTrace(System.err);
  +                     if( e instanceof JMSException ) {
  +                             System.err.println("Linked Exception:");
  +                             
((JMSException)e).getLinkedException().printStackTrace(System.err);
  +                     }
                }
   
        }
  +     SpyTxLog spyTxManager;
   }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  
  import java.net.URL;
  import java.util.HashMap;
  import java.util.TreeSet;
  import java.util.Iterator;
  import java.util.LinkedList;
  
  import org.spydermq.xml.XElement;
  import org.spydermq.persistence.SpyTxLog;
  import org.spydermq.persistence.SpyMessageLog;
  import org.spydermq.SpyDestination;
  import org.spydermq.SpyMessage;
  
  /**
   *    This class manages all persistence related services.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class PersistenceManager {
  
        // The server this persistence manager is providing service for
        JMSServer server;
        // The configuration data for the manager.
        XElement configElement;
        // The directory where persistence data should be stored
        URL dataDirectory;
        // Log file used to store commited transactions.
        SpyTxLog spyTxLog;
        // Maps SpyDestinations to SpyMessageLogs
        HashMap messageLogs = new HashMap();
        // Maps (Long)txIds to LinkedList of Runnable tasks
        HashMap postCommitTasks = new HashMap();
  
        /**
         * PersistenceManager constructor.
         */
        public PersistenceManager(JMSServer server, XElement configElement) throws 
javax.jms.JMSException {
  
                try {
  
                        this.server = server;
                        this.configElement = configElement;
  
                        URL configFile = 
getClass().getClassLoader().getResource("spyderMQ.xml");
                        dataDirectory = new URL(configFile, 
configElement.getField("DataDirectory"));
                        URL txLogFile = new URL(dataDirectory, "transactions.dat");
                        spyTxLog = new SpyTxLog(txLogFile.getFile());
  
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        
        public void add(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage 
message, Long txId) throws javax.jms.JMSException {
  
                SpyMessageLog log;
  
                synchronized (messageLogs) {
                        log = (SpyMessageLog) messageLogs.get(dest);
                }
  
                if (log == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
                synchronized (log) {
                        log.add(message, txId);
                }
  
        }
  
        
        public void addPostCommitTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
  
                LinkedList tasks;
                synchronized (postCommitTasks) {
                        tasks = (LinkedList) postCommitTasks.get(txId);
                }
                if (tasks == null)
                        throw new javax.jms.JMSException("Transaction is not active.");
  
                synchronized (tasks) {
                        tasks.addLast(task);
                }
  
        }
  
        
        public void commitTx(Long txId) throws javax.jms.JMSException {
  
                LinkedList tasks;
                synchronized (postCommitTasks) {
                        tasks = (LinkedList) postCommitTasks.remove(txId);
                }
                if (tasks == null)
                        throw new javax.jms.JMSException("Transaction is not active.");
  
                spyTxLog.commitTx(txId);
  
                synchronized (tasks) {
                        Iterator iter = tasks.iterator();
                        while (iter.hasNext()) {
                                Runnable task = (Runnable) iter.next();
                                task.run();
                        }
                }
  
        }
  
        
        public Long createTx() throws javax.jms.JMSException {
                Long txId = spyTxLog.createTx();
                synchronized (postCommitTasks) {
                        postCommitTasks.put(txId, new LinkedList());
                }
                return txId;
        }
  
        
        public void initQueue(org.spydermq.SpyDestination dest) throws 
javax.jms.JMSException {
  
                try {
  
                        URL logFile = new URL(dataDirectory, dest.getName() + "-queue" 
+ ".dat");
                        SpyMessageLog log = new SpyMessageLog(logFile.getFile());
  
                        messageLogs.put(dest, log);
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        
        public void remove(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage 
message, Long txId) throws javax.jms.JMSException {
  
                SpyMessageLog log;
  
                synchronized (messageLogs) {
                        log = (SpyMessageLog) messageLogs.get(dest);
                }
  
                if (log == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
                log.remove(message, null);
  
        }
        
        
        public void restore() throws javax.jms.JMSException {
  
                TreeSet commitedTXs = spyTxLog.restore();
                HashMap clone;
                synchronized (messageLogs) {
                        clone = (HashMap) messageLogs.clone();
                }
  
                Iterator iter = clone.keySet().iterator();
                while (iter.hasNext()) {
                        SpyDestination dest = (SpyDestination) iter.next();
  
                        JMSServerQueue q = server.getServerQueue(dest);
                        SpyMessageLog log = (SpyMessageLog) clone.get(dest);
  
                        SpyMessage rebuild[] = log.restore(commitedTXs);
  
                        //TODO: make sure this sync lock is good enough
                        synchronized (q) {
                                for (int i = 0; i < rebuild.length; i++) {
                                        q.restoreMessage(rebuild[i]);
                                        q.messageIdCounter = 
Math.max(q.messageIdCounter, rebuild[i].messageId + 1);
                                }
                        }
                }
  
        }
        
        public void rollbackTx(Long txId) throws javax.jms.JMSException {
  
                LinkedList tasks;
                synchronized (postCommitTasks) {
                        tasks = (LinkedList) postCommitTasks.remove(txId);
                }
                if (tasks == null)
                        throw new javax.jms.JMSException("Transaction is not active.");
  
                spyTxLog.rollbackTx(txId);
  
        }
  }
  
  
  

Reply via email to