User: hiram   
  Date: 00/11/22 09:43:33

  Added:       src/java/org/spydermq/persistence SpyMessageLog.java
                        SpyMessageLogTester.java TransactionLog.java
  Log:
  Added a crude persistence scheme to support persistent messages.
  It's Transactional behavior still needs some work.
  
  Revision  Changes    Path
  1.1                  spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.persistence;
  
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.io.BufferedInputStream;
  import java.io.BufferedOutputStream;
  import java.io.IOException;
  import org.spydermq.SpyMessage;
  import java.io.Serializable;
  import java.util.HashMap;
  
  import javax.jms.JMSException;
  
  /**
   * This is used to keep a log of SpyMessages arriving and leaving 
   * a queue.  The log can be used reconstruct the queue in case of
   * provider failure.  Transactional integrety is kept by the use
   * of a TransactionLog.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class SpyMessageLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private ObjectOutputStream out;
        private TransactionLog transactionLog;
        private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
        private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
  
  
        /////////////////////////////////////////////////////////////////////
        // Helper Inner classes
        /////////////////////////////////////////////////////////////////////
        static class MyObjectOutputStream extends ObjectOutputStream {
                MyObjectOutputStream( OutputStream os )  throws IOException {
                        super(os);
                }
                /**
                 * diable the writing of the stream header.
                 */
                protected void writeStreamHeader() {
                }
        }
        
        static class MyObjectInputStream extends ObjectInputStream      {
                MyObjectInputStream( InputStream is ) throws IOException {
                        super(is);
                }
                /**
                 * diable the reading of the stream header.
                 */
                protected void readStreamHeader() {
                }
        }
        
        static class MessageAddedRecord implements Serializable {
                long messageId;
                SpyMessage message;
        }
        
        static class MessageRemovedRecord implements Serializable {
                long messageId;
        }
  
        
        
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public SpyMessageLog(String fileName) throws JMSException {
                try {
                        transactionLog = new TransactionLog(fileName);  
                        out = new MyObjectOutputStream( new BufferedOutputStream( 
transactionLog.getOutputStream() ));
                } catch ( IOException e ) {
                        JMSException newE = new JMSException("Could not open the 
queue's tranaction log: "+fileName);
                        newE.setLinkedException(e);
                        throw newE;
                }
        }
  
        
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        public void logAddMessage( SpyMessage message ) throws JMSException {
                try{
                        messageAddedRecord.message = message;
                        messageAddedRecord.messageId = message.messageId;
                        out.writeObject(messageAddedRecord);
                        out.reset();
                } catch ( IOException e ) {
                        JMSException newE = new JMSException("Could not write to the 
tranaction log.");
                        newE.setLinkedException(e);
                        throw newE;
                }
                
        }
        
        public void logRemoveMessage( SpyMessage message ) throws JMSException {
                try{    
                        messageRemovedRecord.messageId = message.messageId;
                        out.writeObject(messageRemovedRecord);
                        out.reset();
                } catch ( IOException e ) {
                        JMSException newE = new JMSException("Could not write to the 
queue's tranaction log.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
        
        public SpyMessage[] rebuildMessagesFromLog() throws JMSException {
                HashMap messages = new HashMap();
  
                try {   
                        ObjectInputStream in = new MyObjectInputStream( new 
BufferedInputStream( transactionLog.getInputStream() ));
                        try {
                        while (true) {
                                
                                Object o = in.readObject();
                                if( o instanceof MessageAddedRecord ) {
                                        
                                        MessageAddedRecord r = (MessageAddedRecord)o;
                                        r.message.messageId = r.messageId;
                                        messages.put( new Long(r.messageId), 
r.message);
                                        
                                } else if( o instanceof MessageRemovedRecord ) {
                                        
                                        MessageRemovedRecord r = 
(MessageRemovedRecord)o;
                                        messages.remove( new Long(r.messageId));
                                        
                                }
                                
                        }
                        } catch( java.io.EOFException e ) {
                        } 
                        in.close();
                } catch ( Exception e ) {
                        JMSException newE = new JMSException("Could not rebuild the 
queue from the queue's tranaction log.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
                SpyMessage rc[] = new SpyMessage[messages.size()];
                return (SpyMessage [])messages.values().toArray(rc);            
        }
  
        public void commit() throws JMSException {
                try {
                        out.flush();
                        transactionLog.commit();
                } catch ( IOException e ) {
                        JMSException newE = new JMSException("Could not commit to the 
queue transaction log");
                        newE.setLinkedException(e);
                        throw newE;
                }
                
        }
        
        public void rollback() throws JMSException {
                try {
                        out.flush();
                        transactionLog.rollback();
                } catch ( IOException e ) {
                        JMSException newE = new JMSException("Could not rollback the 
queue's tranaction log.");
                        newE.setLinkedException(e);
                        throw newE;
                }               
        }
        
        public void close() throws JMSException {
                try{
                        transactionLog.close();         
                } catch ( IOException e ) {
                        JMSException newE = new JMSException("Could not close the 
queue's tranaction log.");
                        newE.setLinkedException(e);
                        throw newE;
                }
        }
  
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java
  
  Index: SpyMessageLogTester.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.persistence;
  
  import org.spydermq.*;
  
  /**
   * This class was used to perform unit testing on the SpyMessageLog
   * 
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class SpyMessageLogTester {
  
        /**
         * Starts the application.
         * @param args an array of command-line arguments
         */
        public static void main(java.lang.String[] args) throws Exception {
  
                SpyMessageLog log = new SpyMessageLog("SpyMessage20.log");
                
                try{    
  
                                
                        SpyMessage[] queue = log.rebuildMessagesFromLog();
  
                        System.out.println("Recovered :"+queue.length+" message from 
the message log");
                        long maxMessageId=0;
                        for( int i=0; i < queue.length; i++ ) {
                                System.out.println("  #"+i+": "+queue[i]);
                                maxMessageId = Math.max(maxMessageId, 
queue[i].messageId );
                        }
                        
                        long first = ++maxMessageId;
                        add(log, first);
                        long second = ++maxMessageId;
                        add(log, second);
                        remove(log, first);
  
                        System.out.println("Commiting");
                        log.commit();
  
                        add(log, first);
  
                        System.out.println("Rolling back");
                        log.rollback();
  
                        add(log, second+1);
  
                        System.out.println("Commiting");
                        log.commit();
                        
                        System.exit(0);
                } finally {
                        log.close();
                }
  
        }
  
        public static void add(SpyMessageLog log, long messageId) throws Exception {
  
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
                m.setText("Hello World #"+m.messageId);
                System.out.println("Adding message: "+m);
                log.logAddMessage(m);
  
        }
        
        public static void remove(SpyMessageLog log, long messageId) throws Exception {
  
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
                m.setText("Hello World #"+m.messageId);
                System.out.println("Removing message: "+m);
                log.logRemoveMessage(m);
  
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/persistence/TransactionLog.java
  
  Index: TransactionLog.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.persistence;
  
  import java.io.RandomAccessFile;
  import java.io.OutputStream;
  import java.io.InputStream;
  import java.io.IOException;
  import java.io.File;
  
  
  /**
   * This class is used to log tansactions.
   *
   * The InputStream returned by getInputStream() will read 
   * data placed into the log with the OutputStream returned by
   * getOutputStream().  The EOF for the InputStream is the 
   * last commited point of the OutputStream.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class TransactionLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private static final int HEADER_SIZE=16; // in bytes
  
        // Header related stuff 
        private long firstRecordPos;
        private long nextRecordPos;
        private byte headerBytes[]=new byte[HEADER_SIZE];
        
        private RandomAccessFile raf;
  
        private LogOutputStream outputStream;
        private LogInputStream inputStream;
  
        
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
        /////////////////////////////////////////////////////////////////////
        class LogInputStream extends InputStream {
                boolean closed = false;
                long inputPos = 0;
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public int read() throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = TransactionLog.this.read(inputPos);
                        if( rc >= 0 )
                                inputPos ++;
                        return rc;
                }
                public int read(byte bytes[], int off, int len) throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = TransactionLog.this.read(inputPos, bytes, off, len);
                        if( rc >= 0 )
                                inputPos += rc;
                        return rc;
                }
        } 
        
        class LogOutputStream extends OutputStream {
                boolean closed = false;
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public void write(int b) throws IOException {
                        TransactionLog.this.write( (byte)b );
                }
                public void write(byte bytes[], int off, int len) throws IOException {
                        TransactionLog.this.write( bytes, off, len );
                }
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public TransactionLog(String fileName) throws IOException {
                File f = new File(fileName);
                boolean exists = f.isFile();
                
                raf = new RandomAccessFile(f, "rw");
                if( exists ) {
                        loadHeader();
                } else {
                        initHeader();
                }
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods 
        /////////////////////////////////////////////////////////////////////   
        public OutputStream getOutputStream() throws IOException {
                if ( outputStream==null || outputStream.closed ) {
                        outputStream = new LogOutputStream();
                }
                return outputStream;
        }
  
        public InputStream getInputStream() {
                if ( inputStream==null || inputStream.closed ) {
                        inputStream = new LogInputStream();
                }
                return inputStream;
        }
        
        public void commit() throws IOException {
  
                headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
                headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
                headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
                headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
                headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
                headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
                headerBytes[6] = (byte)((firstRecordPos >>>  8) & 0xFF);
                headerBytes[7] = (byte)((firstRecordPos >>>  0) & 0xFF);
                headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
                headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
                headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
                headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
                headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
                headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
                headerBytes[14] =(byte)((nextRecordPos >>>  8) & 0xFF);
                headerBytes[15] =(byte)((nextRecordPos >>>  0) & 0xFF);
  
                raf.seek(0);
                raf.write(headerBytes);
        }
        
        public void rollback() throws IOException {
                loadHeader();
        }
        
        public void close() throws IOException {
                raf.close();
                raf = null;
        }
  
        /////////////////////////////////////////////////////////////////////
        // Private Methods 
        /////////////////////////////////////////////////////////////////////          
 
        private long getBytesLeft(long offset) {
  
                return nextRecordPos-offset;
                
        }
        
        private void initHeader() throws IOException {
                
                firstRecordPos = HEADER_SIZE;
                nextRecordPos = HEADER_SIZE;
  
                commit();
        }
        
        private void loadHeader() throws IOException {
  
                raf.seek(0);
                firstRecordPos = raf.readLong();
                nextRecordPos = raf.readLong();
                
        }
        
        private int read(long offset) throws IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
                
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read();
                return rc;
                
        }
        
        private int read(long offset, byte bytes[], int off, int len) throws 
IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
  
                len = (int)Math.min(len, getBytesLeft(offset));
                
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read(bytes, off, len);
                return rc;
                
        }
        
        private void write(byte []record, int off, int len) throws IOException {
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
                
                raf.write(record, off, len);
                nextRecordPos+=len;
        }
        
        private void write(byte b) throws IOException {
                
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
                
                raf.write(b);
                nextRecordPos++;
        }
        
  }
  
  
  

Reply via email to