User: chirino 
  Date: 01/07/10 19:52:17

  Added:       src/main/org/jbossmq/pm/logged IntegrityLog.java
                        ObjectIntegrityLog.java PersistenceManager.java
                        PersistenceManagerMBean.java SpyMessageLog.java
                        SpyMessageLogTester.java SpyTxLog.java
  Log:
  These are the 3 PersistenceManager implementations we have so far.  The are now all 
configured via JMX
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jbossmq/pm/logged/IntegrityLog.java
  
  Index: IntegrityLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.logged;
  
  import java.io.RandomAccessFile;
  import java.io.OutputStream;
  import java.io.InputStream;
  import java.io.IOException;
  import java.io.File;
  
  
  /**
   * This class is used to create a log file which which will will garantee
   * it's integrety up to the last commit point.
   *
   * The InputStream returned by getInputStream() will read 
   * data placed into the log with the OutputStream returned by
   * getOutputStream().  The EOF for the InputStream is the 
   * last commited point of the OutputStream.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class IntegrityLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private static final int HEADER_SIZE=16; // in bytes
  
        // Header related stuff 
        private long firstRecordPos;
        private long nextRecordPos;
        private byte headerBytes[]=new byte[HEADER_SIZE];
        
        private RandomAccessFile raf;
  
        private LogOutputStream outputStream;
        private LogInputStream inputStream;
  
        
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
        /////////////////////////////////////////////////////////////////////
        class LogInputStream extends InputStream {
                
                boolean closed = false;
                long inputPos = 0;
  
                public long getFilePointer() {
                        return inputPos;
                }
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public int read() throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = IntegrityLog.this.read(inputPos);
                        if( rc >= 0 )
                                inputPos ++;
                        return rc;
                }
                public int read(byte bytes[], int off, int len) throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
                        if( rc >= 0 )
                                inputPos += rc;
                        return rc;
                }
        } 
        
        class LogOutputStream extends OutputStream {
                boolean closed = false;
                public long getFilePointer() {
                        return nextRecordPos;
                }
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public void write(int b) throws IOException {
                        IntegrityLog.this.write( (byte)b );
                }
                public void write(byte bytes[], int off, int len) throws IOException {
                        IntegrityLog.this.write( bytes, off, len );
                }
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public IntegrityLog(String fileName) throws IOException {
                File f = new File(fileName);
                boolean exists = f.isFile();
                
                raf = new RandomAccessFile(f, "rw");
                if( exists ) {
                        loadHeader();
                } else {
                        initHeader();
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods 
        /////////////////////////////////////////////////////////////////////   
        public LogInputStream getInputStream() {
                if ( inputStream==null || inputStream.closed ) {
                        inputStream = new LogInputStream();
                }
                return inputStream;
        }       
        
        public LogOutputStream getOutputStream() throws IOException {
                if ( outputStream==null || outputStream.closed ) {
                        outputStream = new LogOutputStream();
                }
                return outputStream;
        }
        
        public void commit() throws IOException {
  
                headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
                headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
                headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
                headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
                headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
                headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
                headerBytes[6] = (byte)((firstRecordPos >>>  8) & 0xFF);
                headerBytes[7] = (byte)((firstRecordPos >>>  0) & 0xFF);
                headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
                headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
                headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
                headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
                headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
                headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
                headerBytes[14] =(byte)((nextRecordPos >>>  8) & 0xFF);
                headerBytes[15] =(byte)((nextRecordPos >>>  0) & 0xFF);
  
                raf.seek(0);
                raf.write(headerBytes);
        }
        
        public void rollback() throws IOException {
                loadHeader();
        }
        
        public void close() throws IOException {
                raf.close();
                raf = null;
        }
  
        /////////////////////////////////////////////////////////////////////
        // Private Methods 
        /////////////////////////////////////////////////////////////////////          
 
        private long getBytesLeft(long offset) {
  
                return nextRecordPos-offset;
                
        }
        
        private void initHeader() throws IOException {
                
                firstRecordPos = HEADER_SIZE;
                nextRecordPos = HEADER_SIZE;
  
                commit();
        }
        
        private void loadHeader() throws IOException {
  
                raf.seek(0);
                firstRecordPos = raf.readLong();
                nextRecordPos = raf.readLong();
                
        }
        
        private int read(long offset) throws IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
                
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read();
                return rc;
                
        }
        
        private int read(long offset, byte bytes[], int off, int len) throws 
IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
  
                len = (int)Math.min(len, getBytesLeft(offset));
                
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read(bytes, off, len);
                return rc;
                
        }
        
        private void write(byte []record, int off, int len) throws IOException {
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
                
                raf.write(record, off, len);
                nextRecordPos+=len;
        }
        
        private void write(byte b) throws IOException {
                
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
                
                raf.write(b);
                nextRecordPos++;
        }
        
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/pm/logged/ObjectIntegrityLog.java
  
  Index: ObjectIntegrityLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.logged;
  
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.io.IOException;
  import java.io.Serializable;
  import java.io.BufferedInputStream;
  
  import javax.jms.JMSException;
  
  
  /**
   * This is used to keep a log of Serializable Objects with garanteed integrety.
   *
   * Every object add()ed to the log without an exception is garenteed
   * to be recovered by any of the to*() methods.  The log file will not be
   * corrupted if the process dies in the middle of an add().
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class ObjectIntegrityLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private IntegrityLog.LogOutputStream logOutputStream;
        private ObjectOutputStream out;
        private IntegrityLog transactionLog;
  
        static class IndexItem {
                long recordOffset;
                Object record;
        } 
  
  
        /////////////////////////////////////////////////////////////////////
        // Helper Inner classes
        /////////////////////////////////////////////////////////////////////
        static class MyObjectOutputStream extends ObjectOutputStream {
                MyObjectOutputStream(OutputStream os) throws IOException {
                        super(os);
                }
                /**
                 * diable the writing of the stream header.
                 */
                protected void writeStreamHeader() {
                }
        }
  
        static class MyObjectInputStream extends ObjectInputStream {
                MyObjectInputStream(InputStream is) throws IOException {
                        super(is);
                }
                /**
                 * diable the reading of the stream header.
                 */
                protected void readStreamHeader() {
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public ObjectIntegrityLog(String fileName) throws IOException {
                transactionLog = new IntegrityLog(fileName);
                logOutputStream = transactionLog.getOutputStream();
                out = new MyObjectOutputStream(logOutputStream);
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        public void commit() throws IOException {
                transactionLog.commit();
        }
  
        public void rollback() throws IOException {
                transactionLog.rollback();
        }
  
        public void close() throws IOException {
                transactionLog.close();
        }
        
        public IndexItem add(Object o) throws IOException {
                IndexItem item = new IndexItem();
                item.record = o;
                item.recordOffset = logOutputStream.getFilePointer();
  
                out.writeObject(o);
                out.reset();
                out.flush();
  
                return item;
        }
        
        public Object[] toArray() throws IOException, ClassNotFoundException {
                java.util.LinkedList ll = new java.util.LinkedList();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                ll.addLast(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                Object rc[] = new Object[ll.size()];
                return (Object[]) ll.toArray(rc);
        }
        
        public java.util.HashSet toHashSet() throws IOException, 
ClassNotFoundException {
                java.util.HashSet hash = new java.util.HashSet();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                hash.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return hash;
        }
        
        public IndexItem[] toIndex() throws IOException, ClassNotFoundException {
                java.util.LinkedList ll = new java.util.LinkedList();
  
                IntegrityLog.LogInputStream logStream = 
transactionLog.getInputStream();
                ObjectInputStream in = new MyObjectInputStream(logStream);
  
                try {
                        while (true) {
  
                                IndexItem i = new IndexItem();
                                i.recordOffset = logStream.getFilePointer();
                                i.record = in.readObject();
                                ll.addLast(i);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                IndexItem rc[] = new IndexItem[ll.size()];
                return (IndexItem[]) ll.toArray(rc);
        }
        
        public java.util.TreeSet toTreeSet() throws IOException, 
ClassNotFoundException {
                java.util.TreeSet treeSet = new java.util.TreeSet();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                treeSet.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return treeSet;
        }
        
        public java.util.Vector toVector() throws IOException, ClassNotFoundException {
                java.util.Vector vector = new java.util.Vector();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                vector.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return vector;
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.logged;
  
  import javax.jms.JMSException;
  
  import java.net.URL;
  import java.util.HashMap;
  import java.util.TreeSet;
  import java.util.Iterator;
  import java.util.LinkedList;
  
  import org.jbossmq.xml.XElement;
  import org.jbossmq.server.JMSServer;
  import org.jbossmq.server.JMSDestination;
  import org.jbossmq.SpyMessage;
  import org.jbossmq.SpyDestination;
  
  
  import javax.naming.InitialContext;
  import org.jbossmq.pm.TxManager;
  import org.jboss.util.ServiceMBeanSupport;
  import javax.management.*;
  import org.jbossmq.ConnectionToken;
  
  /**
   *    This class manages all persistence related services.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   *
   *    @version $Revision: 1.1 $
   */
  public class PersistenceManager 
        extends ServiceMBeanSupport  
        implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, 
MBeanRegistration {
  
  
  
        private String dataDirectory;
        // Log file used to store commited transactions.
        SpyTxLog spyTxLog;
        // Maps SpyDestinations to SpyMessageLogs
        HashMap messageLogs = new HashMap();
  
        static class LogInfo {
                SpyMessageLog log;
                SpyDestination destination;
                String queueId;
  
                LogInfo(SpyMessageLog log, SpyDestination destination, String queueId) 
{
                        this.log=log;
                        this.destination=destination;
                        this.queueId=queueId;
                }
  
        }
  
  
  
        public Long createPersistentTx() throws javax.jms.JMSException {
                return spyTxLog.createTx();
        }
  
        public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
                spyTxLog.commitTx(txId);
        }
  
        public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
                spyTxLog.rollbackTx(txId);
        }
  
  
  
        public void initQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  
                try {
  
                        URL logFile = new URL(dataDirURL, 
dest.toString()+"-"+queueId+".dat");
                        SpyMessageLog log = new SpyMessageLog(logFile.getFile());
  
                        LogInfo info = new LogInfo(log, dest, queueId);
  
                        messageLogs.put(""+dest+"-"+queueId, info);
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void destroyQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  
                try {
  
                        URL logFile = new URL(dataDirURL, 
dest.toString()+"-"+queueId+".dat");
                        java.io.File file = new java.io.File(logFile.getFile());
  
                        SpyMessageLog log = 
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
                        if( log == null )
                                throw new JMSException("The persistence log was never 
initialized");
                        log.close();
  
                        file.delete();
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void add(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  
                LogInfo logInfo;
  
                synchronized (messageLogs) {
                        logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
                logInfo.log.add(message, txId);
  
        }
  
        public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  
                LogInfo logInfo;
  
                synchronized (messageLogs) {
                        logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
                logInfo.log.remove(message, txId);
  
        }
  
        // The directory where persistence data should be stored
        URL dataDirURL;
        TxManager txManager;
  
  /**
   * Insert the method's description here.
   * Creation date: (6/27/2001 12:53:12 AM)
   * @return java.lang.String
   */
  public java.lang.String getDataDirectory() {
        return dataDirectory;
  }
  
        public String getName() {
                return "JBossMQ-PersistenceManager";
        }
  
  /**
   * getTxManager method comment.
   */
  public org.jbossmq.pm.TxManager getTxManager() {
        return txManager;
  }
  
        public void initService() throws Exception {
  
                URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
                
                dataDirURL = new URL(configFile, dataDirectory);
                URL txLogFile = new URL(dataDirURL, "transactions.dat");
                spyTxLog = new SpyTxLog(txLogFile.getFile());
                
            //Get an InitialContext
                JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );                
                server.setPersistenceManager(this);
                
        }
  
        public void restore(org.jbossmq.server.JMSServer server) throws 
javax.jms.JMSException {
  
                TreeSet commitedTXs = spyTxLog.restore();
                HashMap clone;
                synchronized (messageLogs) {
                        clone = (HashMap) messageLogs.clone();
                }
  
                Iterator iter = clone.values().iterator();
                while (iter.hasNext()) {
  
                        LogInfo logInfo = (LogInfo)iter.next();
  
                        JMSDestination q = 
server.getJMSDestination(logInfo.destination);
  
                        SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
  
                        //TODO: make sure this lock is good enough
                        synchronized (q) {
                                for (int i = 0; i < rebuild.length; i++) {
                                        q.restoreMessage(rebuild[i], logInfo.queueId);
                                }
                        }
                }
  
        }
  
  /**
   * Insert the method's description here.
   * Creation date: (6/27/2001 12:53:12 AM)
   * @param newDataDirectory java.lang.String
   */
  public void setDataDirectory(java.lang.String newDataDirectory) {
        dataDirectory = newDataDirectory;
  }
  
        public void startService() throws Exception {
  
                JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );                
                restore(server);
                
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManagerMBean.java
  
  Index: PersistenceManagerMBean.java
  ===================================================================
  package org.jbossmq.pm.logged;
  
  /*
   * jBoss, the OpenSource EJB server
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  /*
   * jBoss, the OpenSource EJB server
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  /*
   * jBoss, the OpenSource EJB server
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  /*
   * jBoss, the OpenSource EJB server
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  /*
   * jBoss, the OpenSource EJB server
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  /**
   *   <description> 
   * MBean interface for the JBossMQ JMX service.
   *      
   *   @see <related>
   *   @author Vincent Sheffer ([EMAIL PROTECTED])
   *   @version $Revision: 1.1 $
   */
  public interface PersistenceManagerMBean
     extends org.jboss.util.ServiceMBean
  {
     // Constants -----------------------------------------------------
     public static final String OBJECT_NAME = ":service=JBossMQ";
        
     // Public --------------------------------------------------------
  
        
     // Public --------------------------------------------------------
  
        
     // Public --------------------------------------------------------
  
        
     // Public --------------------------------------------------------
  public java.lang.String getDataDirectory();
  public void setDataDirectory(java.lang.String newDataDirectory);
  }
  
  
  
  1.1                  jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.logged;
  
  import java.io.IOException;
  import java.io.Serializable;
  
  
  
  import javax.jms.JMSException;
  
  import org.jbossmq.SpyMessage;
  
  /**
   * This is used to keep a log of SpyMessages arriving and leaving 
   * a queue.  The log can be used reconstruct the queue in case of
   * provider failure.  Integrety is kept by the use of an ObjectIntegrityLog.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class SpyMessageLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////   
        private ObjectIntegrityLog transactionLog;
        private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
        private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
  
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
        /////////////////////////////////////////////////////////////////////   
        static class MessageAddedRecord implements Serializable {
                long messageId;
                boolean isTransacted;
                long transactionId;
                SpyMessage message;
                private final static long serialVersionUID = 235726945332013954L;
        }
        
        static class MessageRemovedRecord implements Serializable {
                boolean isTransacted;
                long transactionId;
                long messageId;
                private final static long serialVersionUID = 235726945332013955L;
        }
  
                
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public SpyMessageLog(String fileName) throws JMSException {
                try {
                        transactionLog = new ObjectIntegrityLog(fileName);      
                } catch ( IOException e ) {
                        throwJMSException("Could not open the queue's tranaction log: 
"+fileName,e);
                }
        }
  
        
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
                        transactionLog.close();         
                } catch ( IOException e ) {
                        throwJMSException("Could not close the queue's tranaction 
log.",e);
                }
        }
  
        synchronized public void add( SpyMessage message, Long transactionId ) throws 
JMSException {
                try{
                        
                        messageAddedRecord.message = message;
                        messageAddedRecord.messageId = message.messageId;
                        if( transactionId == null )     {
                                messageAddedRecord.isTransacted = false;
                        } else {
                                messageAddedRecord.isTransacted = true;
                                messageAddedRecord.transactionId = 
transactionId.longValue();
                        }
                                
                        transactionLog.add(messageAddedRecord);
                        transactionLog.commit();
                        
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the tranaction log.",e);
                }
                
        }       
        
        synchronized public void remove( SpyMessage message, Long transactionId ) 
throws JMSException {
                try{
                        
                        messageRemovedRecord.messageId = message.messageId;
                        if( transactionId == null ) {
                                messageRemovedRecord.isTransacted = false;
                        } else {
                                messageRemovedRecord.isTransacted = true;
                                messageRemovedRecord.transactionId = 
transactionId.longValue();
                        }
                        transactionLog.add(messageRemovedRecord);
                        transactionLog.commit();
                        
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the queue's tranaction 
log.",e);
                }
  
        }       
        
        synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws 
JMSException {
  
                java.util.HashMap messageIndex = new java.util.HashMap();
                        
                try {   
                        ObjectIntegrityLog.IndexItem objects[] = 
transactionLog.toIndex();
                        
                        for( int i=0; i < objects.length; i++ ) {
                                
                                Object o = objects[i].record;
                                if( o instanceof MessageAddedRecord ) {
                                        
                                        MessageAddedRecord r = (MessageAddedRecord)o;
                                        r.message.messageId = r.messageId;
  
                                        if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
                                                // commited... so drop this message
                                                continue;
                                        }
                                        
                                        messageIndex.put( new Long(r.messageId), 
objects[i]);
                                        
                                } else if( o instanceof MessageRemovedRecord ) {
                                        
                                        MessageRemovedRecord r = 
(MessageRemovedRecord)o;
  
                                        if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
                                                // commited... so drop this message
                                                continue;
                                        }
                                        
                                        messageIndex.remove( new Long(r.messageId));
                                        
                                }
                                
                        }
                } catch ( Exception e ) {
                        throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
                }
  
                SpyMessage rc[] = new SpyMessage[messageIndex.size()];
                java.util.Iterator iter = messageIndex.values().iterator();
                for( int i=0; iter.hasNext(); i++ ) {
                        ObjectIntegrityLog.IndexItem item = 
(ObjectIntegrityLog.IndexItem)iter.next();
                        rc[i] = ((MessageAddedRecord)item.record).message;
                }
                return rc;              
        }       
        
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;             
        }
        
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java
  
  Index: SpyMessageLogTester.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.logged;
  
  
  
  import org.jbossmq.*;
  
  /**
   * This class was used to perform unit testing on the SpyMessageLog/SpyTxLog
   * 
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class SpyMessageLogTester {
  
                /**
         * Starts the application.
         * @param args an array of command-line arguments
         */
        public static void main(java.lang.String[] args) throws Exception {
  
                SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat");
                SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat");
                
                try{    
  
                        java.util.TreeSet commited = tm.restore();                     
         
                        SpyMessage[] queue = log.restore(commited);
  
                        System.out.println("Recovered :"+queue.length+" message from 
the message log");
                        long maxMessageId=0;
                        for( int i=0; i < queue.length; i++ ) {
                                System.out.println("  #"+i+": "+queue[i]);
                                maxMessageId = Math.max(maxMessageId, 
queue[i].messageId );
                        }
  
                        Long tx1 = tm.createTx();
                        
                        long first = ++maxMessageId;
                        add(log, first,tx1);
                        long second = ++maxMessageId;
                        add(log, second, tx1);
                        remove(log, first, tx1);
  
                        System.out.println("Commiting");
                        tm.commitTx(tx1);
  
                        Long tx2 = tm.createTx();
                        add(log, first,tx2);
  
                        System.out.println("Rolling back");
                        tm.rollbackTx(tx2);
  
                        add(log, second+1, null);
                        
                        System.exit(0);
                } finally {
                        log.close();
                }
  
        }
  
  
        public static void add(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
  
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
                m.setText("Hello World #"+m.messageId);
                System.out.println("Adding message: "+m+",tx="+txid);
                log.add(m,txid);
  
        }       
        
        public static void remove(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
  
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
                m.setText("Hello World #"+m.messageId);
                System.out.println("Removing message: "+m+", tx="+txid);
                log.remove(m,txid);
  
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java
  
  Index: SpyTxLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.logged;
   
  import java.io.Serializable;
  import java.io.IOException;
  
  import javax.jms.JMSException;
  
  /**
   * This is used to keep a log of commited transactions.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class SpyTxLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private ObjectIntegrityLog transactionLog;
        private long nextTransactionId = Long.MIN_VALUE;
        
        /////////////////////////////////////////////////////////////////////
        // Constructors
        /////////////////////////////////////////////////////////////////////
        public SpyTxLog(String fileName) throws JMSException {
                try {
                        transactionLog = new ObjectIntegrityLog(fileName);
                } catch (IOException e) {
                        throwJMSException("Could not open the queue's tranaction log: 
" + fileName, e);
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
                        transactionLog.close();         
                } catch ( IOException e ) {
                        throwJMSException("Could not close the queue's tranaction 
log.",e);
                }
        }
        
        synchronized public void commitTx(Long id) throws JMSException {
                                
                try {
                        transactionLog.add(id);
                        transactionLog.commit();
                } catch ( IOException e ) {
                        throwJMSException("Could not create a new transaction.",e);
                }
                
        }
        
        synchronized public Long createTx() throws JMSException {
                return new Long(nextTransactionId++);
        }
        
        synchronized public java.util.TreeSet restore() throws JMSException {
                
                java.util.TreeSet items=null;
                try {
                        items = transactionLog.toTreeSet();
                } catch ( Exception e ) {
                        throwJMSException("Could not restore the transaction log.",e);
                }               
  
                long maxId = Long.MIN_VALUE;
                java.util.Iterator iter = items.iterator();
                while( iter.hasNext() ) {
                        Long l = (Long)iter.next();
                        if( l.longValue() > maxId )
                                maxId = l.longValue();
                }
  
                nextTransactionId = maxId+1;
                return items;
                        
        }
  
        synchronized public void rollbackTx(Long txId) throws JMSException {
                        
        }
        
        /////////////////////////////////////////////////////////////////////
        // Private Methods
        /////////////////////////////////////////////////////////////////////
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;             
        }
        
  }
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to