User: hiram   
  Date: 01/01/10 05:57:46

  Modified:    src/java/org/spydermq/persistence SpyTxLog.java
                        SpyMessageLog.java SpyMessageLogTester.java
  Added:       src/java/org/spydermq/persistence SpyMessageQueue.java
  Log:
  Feature Add:  Faster recovery after a spyderMQ server failure
  Feature Add:  Better server scalability by moving message not in the working set to 
secondary storage.
  
  Revision  Changes    Path
  1.2       +56 -9     spyderMQ/src/java/org/spydermq/persistence/SpyTxLog.java
  
  Index: SpyTxLog.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyTxLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyTxLog.java     2000/12/16 03:27:49     1.1
  +++ SpyTxLog.java     2001/01/10 13:57:45     1.2
  @@ -15,7 +15,7 @@
    * This is used to keep a log of commited transactions.
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $    
  + * @version $Revision: 1.2 $    
    */
   public class SpyTxLog {
   
  @@ -24,6 +24,18 @@
        /////////////////////////////////////////////////////////////////////
        private ObjectIntegrityLog transactionLog;
        private long nextTransactionId = Long.MIN_VALUE;
  +     private Record record = new Record();
  +
  +     private final static byte TX_COMMITED = 1;
  +     private final static byte TX_CREATED = 0;
  +     private final static byte TX_PREPARED = 3;
  +     private final static byte TX_ROLLEDBACK = 2;
  +
  +     static class Record implements Serializable {
  +             public static final long serialVersionUID = 1;
  +             byte type;
  +             long txId;
  +     }
        
        /////////////////////////////////////////////////////////////////////
        // Constructors
  @@ -50,8 +62,12 @@
        synchronized public void commitTx(Long id) throws JMSException {
                                
                try {
  -                     transactionLog.add(id);
  +                     
  +                     record.type = TX_COMMITED;
  +                     record.txId = id.longValue();
  +                     transactionLog.add(record);
                        transactionLog.commit();
  +                     
                } catch ( IOException e ) {
                        throwJMSException("Could not create a new transaction.",e);
                }
  @@ -59,32 +75,63 @@
        }
        
        synchronized public Long createTx() throws JMSException {
  -             return new Long(nextTransactionId++);
  +             
  +             Long newId = new Long(nextTransactionId++);
  +             
  +             try {
  +                     
  +                     record.type = TX_CREATED;
  +                     record.txId = newId.longValue();
  +                     transactionLog.add(record);
  +                     transactionLog.commit();
  +                     
  +             } catch ( IOException e ) {
  +                     throwJMSException("Could not create a new transaction.",e);
  +             }
  +             
  +             return newId;
        }
        
        synchronized public java.util.TreeSet restore() throws JMSException {
                
  -             java.util.TreeSet items=null;
  +             java.util.Vector items=null;
                try {
  -                     items = transactionLog.toTreeSet();
  +                     items = transactionLog.toVector();
                } catch ( Exception e ) {
                        throwJMSException("Could not restore the transaction log.",e);
  -             }               
  +             }
   
  +             java.util.TreeSet commitedTxs = new java.util.TreeSet();
                long maxId = Long.MIN_VALUE;
                java.util.Iterator iter = items.iterator();
                while( iter.hasNext() ) {
  -                     Long l = (Long)iter.next();
  -                     if( l.longValue() > maxId )
  -                             maxId = l.longValue();
  +                     
  +                     Record r = (Record)iter.next();
  +                     if( r.txId > maxId )
  +                             maxId = r.txId;
  +
  +                     if( r.type == TX_COMMITED ) 
  +                             commitedTxs.add( new Long( maxId ) );
  +                     
                }
   
                nextTransactionId = maxId+1;
  -             return items;
  +             return commitedTxs;
                        
        }
   
        synchronized public void rollbackTx(Long txId) throws JMSException {
  +
  +             try {
  +                     
  +                     record.type = TX_ROLLEDBACK;
  +                     record.txId = txId.longValue();
  +                     transactionLog.add(record);
  +                     transactionLog.commit();
  +                     
  +             } catch ( IOException e ) {
  +                     throwJMSException("Could not create a new transaction.",e);
  +             }
                        
        }
        
  
  
  
  1.3       +38 -49    spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyMessageLog.java        2000/12/16 03:27:48     1.2
  +++ SpyMessageLog.java        2001/01/10 13:57:46     1.3
  @@ -19,7 +19,7 @@
    * provider failure.  Integrety is kept by the use of an ObjectIntegrityLog.
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.2 $    
  + * @version $Revision: 1.3 $    
    */
   public class SpyMessageLog {
   
  @@ -27,26 +27,23 @@
        // Attributes
        /////////////////////////////////////////////////////////////////////   
        private ObjectIntegrityLog transactionLog;
  -     private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
  -     private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
  +     private Record record = new Record();
   
  +     // Record Types:
  +     private static final byte MESSAGE_ADD_RECORD=0;
  +     private static final byte MESSAGE_REMOVE_RECORD=1;
  +
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
        /////////////////////////////////////////////////////////////////////   
  -     static class MessageAddedRecord implements Serializable {
  +     static class Record implements Serializable {
  +             public static final long serialVersionUID = 1;
  +             byte type;
                long messageId;
                boolean isTransacted;
                long transactionId;
  -             SpyMessage message;
        }
  -     
  -     static class MessageRemovedRecord implements Serializable {
  -             boolean isTransacted;
  -             long transactionId;
  -             long messageId;
  -     }
   
  -             
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
  @@ -58,7 +55,6 @@
                }
        }
   
  -     
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
  @@ -73,16 +69,16 @@
        synchronized public void add( SpyMessage message, Long transactionId ) throws 
JMSException {
                try{
                        
  -                     messageAddedRecord.message = message;
  -                     messageAddedRecord.messageId = message.messageId;
  +                     record.type = MESSAGE_ADD_RECORD;
  +                     record.messageId = message.messageId;
                        if( transactionId == null )     {
  -                             messageAddedRecord.isTransacted = false;
  +                             record.isTransacted = false;
                        } else {
  -                             messageAddedRecord.isTransacted = true;
  -                             messageAddedRecord.transactionId = 
transactionId.longValue();
  +                             record.isTransacted = true;
  +                             record.transactionId = transactionId.longValue();
                        }
                                
  -                     transactionLog.add(messageAddedRecord);
  +                     transactionLog.add(record);
                        transactionLog.commit();
                        
                } catch ( IOException e ) {
  @@ -94,14 +90,15 @@
        synchronized public void remove( SpyMessage message, Long transactionId ) 
throws JMSException {
                try{
                        
  -                     messageRemovedRecord.messageId = message.messageId;
  +                     record.type = MESSAGE_REMOVE_RECORD;
  +                     record.messageId = message.messageId;
                        if( transactionId == null ) {
  -                             messageRemovedRecord.isTransacted = false;
  +                             record.isTransacted = false;
                        } else {
  -                             messageRemovedRecord.isTransacted = true;
  -                             messageRemovedRecord.transactionId = 
transactionId.longValue();
  +                             record.isTransacted = true;
  +                             record.transactionId = transactionId.longValue();
                        }
  -                     transactionLog.add(messageRemovedRecord);
  +                     transactionLog.add(record);
                        transactionLog.commit();
                        
                } catch ( IOException e ) {
  @@ -109,37 +106,40 @@
                }
   
        }       
  +     
        
  -     synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws 
JMSException {
  +     
  +     private void throwJMSException(String message, Exception e) throws 
JMSException {
  +             JMSException newE = new JMSException(message);
  +             newE.setLinkedException(e);
  +             throw newE;             
  +     }
  +     
  +     synchronized public Long[] restore(java.util.TreeSet commited) throws 
JMSException {
   
  -             java.util.HashMap messageIndex = new java.util.HashMap();
  +             java.util.Vector messageIndex = new java.util.Vector();
                        
                try {   
                        ObjectIntegrityLog.IndexItem objects[] = 
transactionLog.toIndex();
                        
                        for( int i=0; i < objects.length; i++ ) {
                                
  -                             Object o = objects[i].record;
  -                             if( o instanceof MessageAddedRecord ) {
  +                             Record r = (Record)objects[i].record;
  +                             if( r.type == MESSAGE_ADD_RECORD ) {
                                        
  -                                     MessageAddedRecord r = (MessageAddedRecord)o;
  -                                     r.message.messageId = r.messageId;
  -
                                        if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
                                                // commited... so drop this message
                                                continue;
                                        }
                                        
  -                                     messageIndex.put( new Long(r.messageId), 
objects[i]);
  +                                     messageIndex.add( new Long(r.messageId) );
                                        
  -                             } else if( o instanceof MessageRemovedRecord ) {
  +                             } else if( r.type == MESSAGE_REMOVE_RECORD ) {
                                        
  -                                     MessageRemovedRecord r = 
(MessageRemovedRecord)o;
  -
                                        if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
  -                                             // commited... so drop this message
  +                                             // commited... so dont remove this 
message...
                                                continue;
                                        }
                                        
  @@ -152,19 +152,8 @@
                        throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
                }
   
  -             SpyMessage rc[] = new SpyMessage[messageIndex.size()];
  -             java.util.Iterator iter = messageIndex.values().iterator();
  -             for( int i=0; iter.hasNext(); i++ ) {
  -                     ObjectIntegrityLog.IndexItem item = 
(ObjectIntegrityLog.IndexItem)iter.next();
  -                     rc[i] = ((MessageAddedRecord)item.record).message;
  -             }
  +             Long rc[] = new Long[messageIndex.size()];
  +             rc = (Long[])messageIndex.toArray(rc);
                return rc;              
  -     }       
  -     
  -     private void throwJMSException(String message, Exception e) throws 
JMSException {
  -             JMSException newE = new JMSException(message);
  -             newE.setLinkedException(e);
  -             throw newE;             
        }
  -     
   }
  
  
  
  1.3       +20 -8     
spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java
  
  Index: SpyMessageLogTester.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyMessageLogTester.java  2000/12/16 03:27:48     1.2
  +++ SpyMessageLogTester.java  2001/01/10 13:57:46     1.3
  @@ -9,11 +9,11 @@
   import org.spydermq.*;
   
   /**
  - * This class was used to perform unit testing on the SpyMessageLog/SpyTxLog
  + * This class was used to perform unit testing on the SpyMessageQueue/SpyTxLog
    * 
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.2 $    
  + * @version $Revision: 1.3 $    
    */
   public class SpyMessageLogTester {
   
  @@ -23,8 +23,8 @@
         */
        public static void main(java.lang.String[] args) throws Exception {
   
  -             SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat");
  -             SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat");
  +             SpyTxLog tm = new SpyTxLog("SpyTxManager4.dat");
  +             SpyMessageQueue log = new SpyMessageQueue(null, "SpyMessageLog4.dat", 
"SpyMessageLogDir4");
                
                try{    
   
  @@ -58,14 +58,26 @@
                        add(log, second+1, null);
                        
                        System.exit(0);
  +                     
  +             } catch ( Exception e ) {
  +                     e.printStackTrace();
  +                     if( e instanceof javax.jms.JMSException && 
((javax.jms.JMSException)e).getLinkedException()!=null ) {
  +                             
((javax.jms.JMSException)e).getLinkedException().printStackTrace();
  +                     }
  +                     
                } finally {
                        log.close();
  +                     tm.close();
                }
   
        }
   
  +
  +     
  +     
  +
   
  -     public static void add(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
  +     public static void add(SpyMessageQueue log, long messageId, Long txid) throws 
Exception {
   
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
  @@ -73,9 +85,9 @@
                System.out.println("Adding message: "+m+",tx="+txid);
                log.add(m,txid);
   
  -     }       
  -     
  -     public static void remove(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
  +     }
  +
  +     public static void remove(SpyMessageQueue log, long messageId, Long txid) 
throws Exception {
   
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/persistence/SpyMessageQueue.java
  
  Index: SpyMessageQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.persistence;
  
  import java.io.IOException;
  import java.io.FileInputStream;
  import java.io.FileOutputStream;
  import java.io.BufferedOutputStream;
  import java.io.File;
  import java.io.BufferedInputStream;
  import java.io.ObjectOutputStream;
  import java.io.ObjectInputStream;
  
  import javax.jms.JMSException;
  
  import org.spydermq.SpyMessage;
  import org.spydermq.server.PersistenceManager;
  import org.spydermq.Log;
  
  /**
   * This is used to persist SpyMessages arriving and leaving 
   * a queue.  It uses a log that can be used reconstruct the queue 
   * in case of provider failure.  Messages are stored in
   * a given directory.  A SpyMessageLog is used maintain
   * the list of (transactionalyt) valid messages in the queue.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class SpyMessageQueue {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        File messageDirectory;
        SpyMessageLog messageLog;
        PersistenceManager persistenceManager;
  
        static class DeleteFileTask implements Runnable {
                
                DeleteFileTask(File f) {
                        file = f;
                }
                
                File file;
                public void run() {
        
                        if( !file.delete() )
                                Log.notice("Could not remove the file: "+file);
  
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public SpyMessageQueue(PersistenceManager pm, String fileName, String 
messageDir) throws JMSException {
  
                persistenceManager = pm;
                messageLog = new SpyMessageLog( fileName );
                messageDirectory = new File( messageDir );
                if( !messageDirectory.exists() ) {
                        messageDirectory.mkdirs();
                        messageDirectory.mkdir();
                }
                if( !messageDirectory.isDirectory() ) {
                        throwJMSException("Invalid message directory: 
"+messageDirectory, null);
                }
                
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                messageLog.close();
        }
  
        
        
        synchronized public void remove( SpyMessage message, Long transactionId ) 
throws JMSException {
  
                messageLog.remove(message, transactionId);
                File f = messageIdToFile(message.messageId);
                
                if( persistenceManager == null || transactionId == null ) {
                        if( !f.delete() )
                                Log.notice("Could not remove the file: "+f);           
         
                } else {
                        persistenceManager.addPostCommitTask(transactionId,     new 
DeleteFileTask(f) );
                }
                
        }       
        
        synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws 
JMSException {
  
                Long messageIndex[] = messageLog.restore( commited );
                java.util.HashSet files = new java.util.HashSet();
  
                // Get a listing of all the files in the directory
                {
                        File t[] = messageDirectory.listFiles();
                        for( int i=0; i < t.length; i++ ) 
                                files.add( t[i] );
                }
  
                // Read in all the messages
                SpyMessage messages[] = new SpyMessage[ messageIndex.length ];
                for( int i=0; i < messageIndex.length; i++ ) {
  
                        File file = messageIdToFile( messageIndex[i].longValue() );
                        files.remove(file);
  
                        try {
                                
                                ObjectInputStream is = new ObjectInputStream(
                                        new BufferedInputStream(
                                                new FileInputStream( file ) ) );
  
                                messages[i] = (SpyMessage)is.readObject();
                                messages[i].messageId = messageIndex[i].longValue();
                                is.close();
                                        
                        } catch (Exception e ) {
                                throwJMSException("Could not restore a persisted 
message", e);
                        }
  
                }
  
                // All the files left in the listing are messages that should
                // be deleted
                java.util.Iterator iter = files.iterator();
                while( iter.hasNext() ) {
                        File f = (File)iter.next();
                        if( !f.delete() )
                                Log.notice("Could not remove the file: "+f);
                }
  
                
                return messages;
                
        }       
        
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;             
        }
        
  
        synchronized public File messageIdToFile( long messageId ) throws JMSException 
{
  
                return new File(messageDirectory, "message_"+messageId+".dat");
                
        }
  
        synchronized public File add( SpyMessage message, Long transactionId ) throws 
JMSException {
  
                File f = messageIdToFile(message.messageId);
  
                try {
                        
                        ObjectOutputStream os = new ObjectOutputStream( 
                                new BufferedOutputStream( 
                                        new FileOutputStream(f) ) );
                        os.writeObject(message);
                        os.close();
                        
                        if( persistenceManager != null && transactionId != null ) {
                                persistenceManager.addPostRollbackTask(transactionId, 
new DeleteFileTask(f) );
                        }
                                
                } catch ( IOException e ) {
                        throwJMSException("Could not persist the message", e);
                }
                
                messageLog.add( message, transactionId );
                return f;
                
        }
  
  /**
   * Insert the method's description here.
   * Creation date: (1/10/01 7:27:11 AM)
   * @return java.io.File
   */
  public java.io.File getMessageDirectory() {
        return messageDirectory;
  }
  }
  
  
  

Reply via email to