User: pkendall
  Date: 01/07/31 13:17:52

  Added:       src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java
                        ObjectIntegrityLog.java PersistenceManager.java
                        PersistenceManagerMBean.java SpyMessageLog.java
                        SpyTxLog.java
  Log:
  new persistence mechanism
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java
  
  Index: IntegrityLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.rollinglogged;
  
  import java.io.RandomAccessFile;
  import java.io.OutputStream;
  import java.io.InputStream;
  import java.io.IOException;
  import java.io.File;
  
  
  /**
   * This class is used to create a log file which which will will garantee
   * it's integrety up to the last commit point.
   *
   * The InputStream returned by getInputStream() will read
   * data placed into the log with the OutputStream returned by
   * getOutputStream().  The EOF for the InputStream is the
   * last commited point of the OutputStream.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $
   */
  public class IntegrityLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private static final int HEADER_SIZE=16; // in bytes
  
        // Header related stuff
        private long firstRecordPos;
        private long nextRecordPos;
        private byte headerBytes[]=new byte[HEADER_SIZE];
  
        private RandomAccessFile raf;
    private File f;
  
        private LogOutputStream outputStream;
        private LogInputStream inputStream;
  
  
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
        /////////////////////////////////////////////////////////////////////
        class LogInputStream extends InputStream {
  
                boolean closed = false;
                long inputPos = 0;
  
                public long getFilePointer() {
                        return inputPos;
                }
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public int read() throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = IntegrityLog.this.read(inputPos);
                        if( rc >= 0 )
                                inputPos ++;
                        return rc;
                }
                public int read(byte bytes[], int off, int len) throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
                        if( rc >= 0 )
                                inputPos += rc;
                        return rc;
                }
        }
  
        class LogOutputStream extends OutputStream {
                boolean closed = false;
                public long getFilePointer() {
                        return nextRecordPos;
                }
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public void write(int b) throws IOException {
                        IntegrityLog.this.write( (byte)b );
                }
                public void write(byte bytes[], int off, int len) throws IOException {
                        IntegrityLog.this.write( bytes, off, len );
                }
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public IntegrityLog(String fileName) throws IOException {
                f = new File(fileName);
                boolean exists = f.isFile();
  
                raf = new RandomAccessFile(f, "rw");
                if( exists ) {
                        loadHeader();
                } else {
                        initHeader();
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        public LogInputStream getInputStream() {
                if ( inputStream==null || inputStream.closed ) {
                        inputStream = new LogInputStream();
                }
                return inputStream;
        }
  
        public LogOutputStream getOutputStream() throws IOException {
                if ( outputStream==null || outputStream.closed ) {
                        outputStream = new LogOutputStream();
                }
                return outputStream;
        }
  
        public void commit() throws IOException {
  
                headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
                headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
                headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
                headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
                headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
                headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
                headerBytes[6] = (byte)((firstRecordPos >>>  8) & 0xFF);
                headerBytes[7] = (byte)((firstRecordPos >>>  0) & 0xFF);
                headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
                headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
                headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
                headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
                headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
                headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
                headerBytes[14] =(byte)((nextRecordPos >>>  8) & 0xFF);
                headerBytes[15] =(byte)((nextRecordPos >>>  0) & 0xFF);
  
                raf.seek(0);
                raf.write(headerBytes);
        }
  
        public void rollback() throws IOException {
                loadHeader();
        }
  
    public void delete() throws IOException {
      f.delete();
    }
  
        public void close() throws IOException {
                raf.close();
                raf = null;
        }
  
        /////////////////////////////////////////////////////////////////////
        // Private Methods
        /////////////////////////////////////////////////////////////////////
        private long getBytesLeft(long offset) {
  
                return nextRecordPos-offset;
  
        }
  
        private void initHeader() throws IOException {
  
                firstRecordPos = HEADER_SIZE;
                nextRecordPos = HEADER_SIZE;
  
                commit();
        }
  
        private void loadHeader() throws IOException {
  
                raf.seek(0);
                firstRecordPos = raf.readLong();
                nextRecordPos = raf.readLong();
  
        }
  
        private int read(long offset) throws IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
  
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read();
                return rc;
  
        }
  
        private int read(long offset, byte bytes[], int off, int len) throws 
IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
  
                len = (int)Math.min(len, getBytesLeft(offset));
  
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read(bytes, off, len);
                return rc;
  
        }
  
        private void write(byte []record, int off, int len) throws IOException {
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
  
                raf.write(record, off, len);
                nextRecordPos+=len;
        }
  
        private void write(byte b) throws IOException {
  
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
  
                raf.write(b);
                nextRecordPos++;
        }
  
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/pm/rollinglogged/ObjectIntegrityLog.java
  
  Index: ObjectIntegrityLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.rollinglogged;
  
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.io.IOException;
  import java.io.Serializable;
  import java.io.BufferedInputStream;
  
  import javax.jms.JMSException;
  
  
  /**
   * This is used to keep a log of Serializable Objects with garanteed integrety.
   *
   * Every object add()ed to the log without an exception is garenteed
   * to be recovered by any of the to*() methods.  The log file will not be
   * corrupted if the process dies in the middle of an add().
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $
   */
  public class ObjectIntegrityLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private IntegrityLog.LogOutputStream logOutputStream;
        private ObjectOutputStream out;
        private IntegrityLog transactionLog;
  
        static class IndexItem {
                long recordOffset;
                Object record;
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Helper Inner classes
        /////////////////////////////////////////////////////////////////////
        static class MyObjectOutputStream extends ObjectOutputStream {
                MyObjectOutputStream(OutputStream os) throws IOException {
                        super(os);
                }
                /**
                 * diable the writing of the stream header.
                 */
                protected void writeStreamHeader() {
                }
        }
  
        static class MyObjectInputStream extends ObjectInputStream {
                MyObjectInputStream(InputStream is) throws IOException {
                        super(is);
                }
                /**
                 * diable the reading of the stream header.
                 */
                protected void readStreamHeader() {
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public ObjectIntegrityLog(String fileName) throws IOException {
                transactionLog = new IntegrityLog(fileName);
                logOutputStream = transactionLog.getOutputStream();
                out = new MyObjectOutputStream(logOutputStream);
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        public void commit() throws IOException {
                transactionLog.commit();
        }
  
        public void rollback() throws IOException {
                transactionLog.rollback();
        }
  
    public void delete() throws IOException {
      transactionLog.delete();
    }
  
        public void close() throws IOException {
                transactionLog.close();
        }
  
        public IndexItem add(Object o) throws IOException {
                IndexItem item = new IndexItem();
                item.record = o;
                item.recordOffset = logOutputStream.getFilePointer();
  
                out.writeObject(o);
                out.reset();
                out.flush();
  
                return item;
        }
  
        public Object[] toArray() throws IOException, ClassNotFoundException {
                java.util.LinkedList ll = new java.util.LinkedList();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                ll.addLast(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                Object rc[] = new Object[ll.size()];
                return (Object[]) ll.toArray(rc);
        }
  
        public java.util.HashSet toHashSet() throws IOException, 
ClassNotFoundException {
                java.util.HashSet hash = new java.util.HashSet();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                hash.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return hash;
        }
  
        public IndexItem[] toIndex() throws IOException, ClassNotFoundException {
                java.util.LinkedList ll = new java.util.LinkedList();
  
                IntegrityLog.LogInputStream logStream = 
transactionLog.getInputStream();
                ObjectInputStream in = new MyObjectInputStream(logStream);
  
                try {
                        while (true) {
  
                                IndexItem i = new IndexItem();
                                i.recordOffset = logStream.getFilePointer();
                                i.record = in.readObject();
                                ll.addLast(i);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                IndexItem rc[] = new IndexItem[ll.size()];
                return (IndexItem[]) ll.toArray(rc);
        }
  
        public java.util.TreeSet toTreeSet() throws IOException, 
ClassNotFoundException {
                java.util.TreeSet treeSet = new java.util.TreeSet();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                treeSet.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return treeSet;
        }
  
        public java.util.Vector toVector() throws IOException, ClassNotFoundException {
                java.util.Vector vector = new java.util.Vector();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                vector.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return vector;
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  package org.jbossmq.pm.rollinglogged;
  
  import javax.jms.JMSException;
  
  import java.net.URL;
  import java.util.HashMap;
  import java.util.TreeSet;
  import java.util.Iterator;
  import java.util.LinkedList;
  import java.util.HashSet;
  
  import org.jbossmq.xml.XElement;
  import org.jbossmq.server.JMSServer;
  import org.jbossmq.server.JMSDestination;
  import org.jbossmq.SpyMessage;
  import org.jbossmq.SpyDestination;
  
  
  import javax.naming.InitialContext;
  import org.jbossmq.pm.TxManager;
  import org.jboss.util.ServiceMBeanSupport;
  import javax.management.*;
  import org.jbossmq.ConnectionToken;
  
  /**
   *    This class manages all persistence related services.
   *
   *    @author David Maplesden
   *
   *    @version $Revision: 1.1 $
   */
  public class PersistenceManager
        extends ServiceMBeanSupport
        implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, 
MBeanRegistration {
  
    public static final int ROLL_OVER_SIZE = 1000;
    public static final String TRANS_FILE_NAME = "transactions.dat";
    public static final boolean DEBUG = false;
        /**
         * NewPersistenceManager constructor.
         */
        public PersistenceManager() throws javax.jms.JMSException {
                txManager = new TxManager( this );
        }
  
  
        private String dataDirectory;
    int numRollOvers = 0;
    HashMap queues = new HashMap();
        // Log file used to store commited transactions.
        SpyTxLog currentTxLog;
    long nextTxId = Long.MIN_VALUE;
        // Maps txLogs to Maps of SpyDestinations to SpyMessageLogs
        HashMap messageLogs = new HashMap();
  
    // Maps transactionIds to txInfos
    HashMap transToTxLogs = new HashMap();
  
        static class LogInfo {
                SpyMessageLog log;
                SpyDestination destination;
      int liveMessages = 0;
      SpyTxLog txLog;
  
                LogInfo(SpyMessageLog log, SpyDestination destination, SpyTxLog txLog) 
{
                        this.log=log;
                        this.destination=destination;
        this.txLog = txLog;
                }
  
        }
  
    static class TxInfo {
      Long txId;
      LinkedList addMessages = new LinkedList();
      LinkedList ackMessages = new LinkedList();
      SpyTxLog log;
      TxInfo(Long txId,SpyTxLog log){
        this.txId = txId;
        this.log = log;
      }
    }
  
        public Long createPersistentTx() throws javax.jms.JMSException {
      Long txId = null;
      SpyTxLog txLog = currentTxLog;
      synchronized(transToTxLogs){
        txId = new Long(++nextTxId);
        transToTxLogs.put(txId,new TxInfo(txId,txLog));
      }
      txLog.createTx();
      return txId;
        }
  
        public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
      TxInfo info = null;
      LinkedList messagesToDelete = null;
      synchronized(transToTxLogs){
        info = (TxInfo)transToTxLogs.remove(txId);
        messagesToDelete = info.ackMessages;
      }
      deleteMessages(messagesToDelete);
                info.log.commitTx(txId);
      checkCleanup(info.log);
        }
  
        public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
      TxInfo info = null;
      LinkedList messagesToDelete = null;
      synchronized(transToTxLogs){
        info = (TxInfo)transToTxLogs.remove(txId);
        messagesToDelete = info.addMessages;
      }
      deleteMessages(messagesToDelete);
                info.log.rollbackTx(txId);
      checkCleanup(info.log);
        }
  
    protected void deleteMessages(LinkedList messages) throws javax.jms.JMSException{
      for(Iterator it = messages.iterator();it.hasNext();){
        LogInfo info = ((LogInfo)((SpyMessage)it.next()).persistData);
        synchronized(info){
          --info.liveMessages;
        }
        checkCleanup(info.txLog);
      }
    }
  
    protected int messageCounter = 0;
    protected void checkRollOver() throws JMSException{
      synchronized(queues){
        int max = queues.size();
        if(max == 0)
          max = ROLL_OVER_SIZE;
        else
          max *= ROLL_OVER_SIZE;
        if(++messageCounter > max){
          messageCounter = 0;
          rollOverLogs();
        }
      }
    }
  
    protected void rollOverLogs() throws JMSException{
      try{
        HashMap logs = new HashMap();
        ++numRollOvers;
        SpyTxLog newTxLog = new SpyTxLog(new 
URL(dataDirURL,TRANS_FILE_NAME+numRollOvers).getFile());
  
        for(Iterator it = queues.values().iterator();it.hasNext();){
          SpyDestination dest = (SpyDestination)it.next();
          SpyMessageLog log = new SpyMessageLog(new URL(dataDirURL, 
dest.toString()+".dat"+numRollOvers).getFile());
          LogInfo logInfo = new LogInfo(log, dest, newTxLog);
          logs.put(""+dest,logInfo);
        }
        SpyTxLog oldLog = currentTxLog;
        synchronized(messageLogs){
          currentTxLog = newTxLog;
          messageLogs.put(newTxLog,logs);
        }
        checkCleanup(oldLog);
      }catch(java.net.MalformedURLException e){
        JMSException jme = new JMSException("Error rolling over logs to new files.");
        jme.setLinkedException(e);
        throw jme;
      }
    }
  
    protected void checkCleanup(SpyTxLog txLog) throws JMSException{
      if(txLog == currentTxLog)
        return;
      HashMap logs;
      synchronized(messageLogs){
        logs = (HashMap) messageLogs.get(txLog);
      }
      synchronized(logs){
        //if no live messages and no live transactions then cleanup
        for(Iterator it = logs.values().iterator();it.hasNext();){
          LogInfo info = (LogInfo)it.next();
          synchronized(info){
            if(info.liveMessages != 0)
              return;
          }
        }
      }
      if(!txLog.completed()){
        return;
      }
      if(DEBUG) System.out.println("Cleaning up");
      //close and delete all logs, remove data from data structures.
      synchronized(messageLogs){
        logs = (HashMap)messageLogs.remove(txLog);
      }
      if(logs == null)
        return;
      txLog.close();
      txLog.delete();
      for(Iterator it = logs.values().iterator();it.hasNext();){
        LogInfo info = (LogInfo)it.next();
        info.log.close();
        info.log.delete();
      }
    }
  
        public void initQueue( SpyDestination dest ) throws javax.jms.JMSException {
  
      String key = ""+dest;
      queues.put(key,dest);
  
        }
  
        public void destroyQueue( SpyDestination dest ) throws javax.jms.JMSException {
  
                try {
        String key = ""+dest;
        queues.remove(key);
  
                        SpyMessageLog log = null;
        HashMap logs;
        synchronized(messageLogs){
          logs = (HashMap)messageLogs.get(currentTxLog);
        }
        synchronized(logs){
          log = (SpyMessageLog)logs.remove(key);
        }
        if( log == null )
          throw new JMSException("The persistence log was never initialized");
        log.close();
        log.delete();
  
        HashSet deleteLogs = new HashSet();
        synchronized(messageLogs){
          for(Iterator it = messageLogs.values().iterator();it.hasNext();){
            logs = (HashMap) it.next();
            synchronized(logs){
              log = (SpyMessageLog)logs.remove(key);
            }
  
            if(log != null){
              deleteLogs.add(log);
            }
          }
        }
        for(Iterator it=deleteLogs.iterator();it.hasNext();){
          log = (SpyMessageLog)it.next();
          log.close();
          log.delete();
        }
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void add(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  
                LogInfo logInfo;
  
      SpyTxLog txLog = null;
      if(txId == null){
        txLog = currentTxLog;
      }else{
        synchronized(transToTxLogs){
          txLog = ((TxInfo)transToTxLogs.get(txId)).log;
        }
      }
  
      HashMap logs;
                synchronized (messageLogs) {
        logs = (HashMap)messageLogs.get(txLog);
                }
      synchronized(logs){
                        logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
      synchronized(logInfo){
          logInfo.liveMessages++;
        message.persistData = logInfo;
        logInfo.log.add(message, txId);
      }
      if(txId != null){
        synchronized(transToTxLogs){
          TxInfo txInfo = (TxInfo)transToTxLogs.get(txId);
          txInfo.addMessages.add(message);
        }
      }
      checkRollOver();
        }
  
        public void remove(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  
                LogInfo logInfo;
  
      SpyTxLog txLog = ((LogInfo)message.persistData).txLog;
                synchronized (messageLogs) {
        HashMap logs = (HashMap)messageLogs.get(txLog);
                        logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
      synchronized(logInfo.log){
                logInfo.log.remove(message, txId);
      }
      if(txId != null){
        synchronized(transToTxLogs){
          TxInfo txInfo = (TxInfo)transToTxLogs.get(txId);
          txInfo.ackMessages.add(message);
        }
      }
      if(txId == null){
        synchronized(logInfo){
          --logInfo.liveMessages;
        }
        checkCleanup(txLog);
      }
        }
  
        // The directory where persistence data should be stored
        URL dataDirURL;
        TxManager txManager;
  
  /**
   * Insert the method's description here.
   * Creation date: (6/27/2001 12:53:12 AM)
   * @return java.lang.String
   */
  public java.lang.String getDataDirectory() {
        return dataDirectory;
  }
  
        public String getName() {
                return "JBossMQ-PersistenceManager";
        }
  
  /**
   * getTxManager method comment.
   */
  public org.jbossmq.pm.TxManager getTxManager() {
        return txManager;
  }
  
        public void initService() throws Exception {
  
      if(DEBUG) System.out.println("Using new rolling logged persistence manager.");
  
                URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
  
                dataDirURL = new URL(configFile, dataDirectory);
  
            //Get an InitialContext
                JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );
                server.setPersistenceManager(this);
  
        }
  
        public void restore(org.jbossmq.server.JMSServer server) throws 
javax.jms.JMSException {
  
      TreeSet commitedTxs = new TreeSet();
      HashMap txLogs = new HashMap();
      java.io.File dir = new java.io.File(dataDirURL.getFile());
      java.io.File [] dataFiles = dir.listFiles();
  
      for(int i=0;i<dataFiles.length;++i){
        String name = dataFiles[i].getName();
        if(name.startsWith(TRANS_FILE_NAME)){
          int rollOver = name.charAt(name.length()-1) - '0';
          numRollOvers = Math.max(numRollOvers,rollOver+1);
          SpyTxLog txLog = new SpyTxLog(dataFiles[i].getAbsolutePath());
          txLog.restore(commitedTxs);
          txLogs.put(new Integer(rollOver),txLog);
          messageLogs.put(txLog, new HashMap());
        }
      }
  
      if(!commitedTxs.isEmpty())
        nextTxId = ((Long)commitedTxs.last()).longValue();
  
      for(int i=0;i<dataFiles.length;++i){
        String name = dataFiles[i].getName();
        if(!name.startsWith(TRANS_FILE_NAME)){
          int rollOver = name.charAt(name.length()-1) - '0';
          String key = name.substring(0,name.length()-5);
          SpyMessageLog messageLog = new SpyMessageLog(dataFiles[i].getAbsolutePath());
          SpyMessage [] messages = messageLog.restore(commitedTxs);
          SpyTxLog txLog = (SpyTxLog)txLogs.get(new Integer(rollOver));
          SpyDestination dest = (SpyDestination)queues.get(key);
          JMSDestination q = server.getJMSDestination(dest);
          LogInfo info = new LogInfo(messageLog,dest,txLog);
          info.liveMessages = messages.length;
          HashMap logs = (HashMap)messageLogs.get(txLog);
          logs.put(key,info);
                        //TODO: make sure this lock is good enough
          synchronized(q){
            for(int j=0;j<messages.length;j++){
              messages[j].persistData = info;
              q.restoreMessage(messages[j]);
            }
          }
        }
      }
  
      try{
  
        URL txLogFile = new URL(dataDirURL, TRANS_FILE_NAME+numRollOvers);
        currentTxLog = new SpyTxLog(txLogFile.getFile());
        messageLogs.put(currentTxLog, new HashMap());
  
        for(Iterator it = queues.values().iterator();it.hasNext();){
          SpyDestination dest = (SpyDestination)it.next();
          String key = ""+dest;
          URL logFile = new URL(dataDirURL, dest.toString()+".dat"+numRollOvers);
          SpyMessageLog log = new SpyMessageLog(logFile.getFile());
  
          synchronized(messageLogs){
            LogInfo logInfo = new LogInfo(log, dest, currentTxLog);
            HashMap logs = (HashMap)messageLogs.get(currentTxLog);
            logs.put(key, logInfo);
          }
        }
  
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
        }
  
  /**
   * Insert the method's description here.
   * Creation date: (6/27/2001 12:53:12 AM)
   * @param newDataDirectory java.lang.String
   */
  public void setDataDirectory(java.lang.String newDataDirectory) {
        dataDirectory = newDataDirectory;
  }
  
        public void startService() throws Exception {
  
                JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );
                restore(server);
  
        }
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java
  
  Index: PersistenceManagerMBean.java
  ===================================================================
  package org.jbossmq.pm.rollinglogged;
  
  /*
   * jBoss, the OpenSource EJB server
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  /**
   *   <description>
   * MBean interface for the JBossMQ JMX service.
   *
   *   @see <related>
   *   @author Vincent Sheffer ([EMAIL PROTECTED])
   *   @version $Revision: 1.1 $
   */
  public interface PersistenceManagerMBean
     extends org.jboss.util.ServiceMBean
  {
     // Constants -----------------------------------------------------
     public static final String OBJECT_NAME = ":service=JBossMQ";
  
     // Public --------------------------------------------------------
  
  
     // Public --------------------------------------------------------
  
  
     // Public --------------------------------------------------------
  
  
     // Public --------------------------------------------------------
  public java.lang.String getDataDirectory();
  public void setDataDirectory(java.lang.String newDataDirectory);
  }
  
  
  
  1.1                  jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.rollinglogged;
  
  import java.io.IOException;
  import java.io.Serializable;
  import javax.jms.JMSException;
  
  import org.jbossmq.SpyMessage;
  
  /**
   * This is used to keep a log of SpyMessages arriving and leaving
   * a queue.  The log can be used reconstruct the queue in case of
   * provider failure.  Integrety is kept by the use of an ObjectIntegrityLog.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $
   */
  public class SpyMessageLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private ObjectIntegrityLog transactionLog;
        private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
        private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
  
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
        /////////////////////////////////////////////////////////////////////
        static class MessageAddedRecord implements Serializable {
                long messageId;
                boolean isTransacted;
                long transactionId;
                SpyMessage message;
                private final static long serialVersionUID = 235726945332013954L;
        }
  
        static class MessageRemovedRecord implements Serializable {
                boolean isTransacted;
                long transactionId;
                long messageId;
                private final static long serialVersionUID = 235726945332013955L;
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public SpyMessageLog(String fileName) throws JMSException {
                try {
                        transactionLog = new ObjectIntegrityLog(fileName);
                } catch ( IOException e ) {
                        throwJMSException("Could not open the queue's tranaction log: 
"+fileName,e);
                }
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
                        transactionLog.close();
                } catch ( IOException e ) {
                        throwJMSException("Could not close the queue's tranaction 
log.",e);
                }
        }
  
    synchronized public void delete()throws JMSException{
                try{
                        transactionLog.delete();
                } catch ( IOException e ) {
                        throwJMSException("Could not delete the queue's tranaction 
log.",e);
                }
    }
  
        synchronized public void add( SpyMessage message, Long transactionId ) throws 
JMSException {
                try{
  
                        messageAddedRecord.message = message;
                        messageAddedRecord.messageId = message.messageId;
                        if( transactionId == null )     {
                                messageAddedRecord.isTransacted = false;
                        } else {
                                messageAddedRecord.isTransacted = true;
                                messageAddedRecord.transactionId = 
transactionId.longValue();
                        }
  
                        transactionLog.add(messageAddedRecord);
                        transactionLog.commit();
  
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the tranaction log.",e);
                }
  
        }
  
        synchronized public void remove( SpyMessage message, Long transactionId ) 
throws JMSException {
                try{
  
                        messageRemovedRecord.messageId = message.messageId;
                        if( transactionId == null ) {
                                messageRemovedRecord.isTransacted = false;
                        } else {
                                messageRemovedRecord.isTransacted = true;
                                messageRemovedRecord.transactionId = 
transactionId.longValue();
                        }
                        transactionLog.add(messageRemovedRecord);
                        transactionLog.commit();
  
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the queue's tranaction 
log.",e);
                }
  
        }
  
        synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws 
JMSException {
  
                java.util.HashMap messageIndex = new java.util.HashMap();
  
                try {
                        ObjectIntegrityLog.IndexItem objects[] = 
transactionLog.toIndex();
  
                        for( int i=0; i < objects.length; i++ ) {
  
                                Object o = objects[i].record;
                                if( o instanceof MessageAddedRecord ) {
  
                                        MessageAddedRecord r = (MessageAddedRecord)o;
                                        r.message.messageId = r.messageId;
  
                                        if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
                                                // commited... so drop this message
                                                continue;
                                        }
  
                                        messageIndex.put( new Long(r.messageId), 
objects[i]);
  
                                } else if( o instanceof MessageRemovedRecord ) {
  
                                        MessageRemovedRecord r = 
(MessageRemovedRecord)o;
  
                                        if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
                                                // commited... so drop this message
                                                continue;
                                        }
  
                                        messageIndex.remove( new Long(r.messageId));
  
                                }
  
                        }
                } catch ( Exception e ) {
                        throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
                }
  
                SpyMessage rc[] = new SpyMessage[messageIndex.size()];
                java.util.Iterator iter = messageIndex.values().iterator();
                for( int i=0; iter.hasNext(); i++ ) {
                        ObjectIntegrityLog.IndexItem item = 
(ObjectIntegrityLog.IndexItem)iter.next();
                        rc[i] = ((MessageAddedRecord)item.record).message;
                }
                return rc;
        }
  
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java
  
  Index: SpyTxLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.pm.rollinglogged;
  
  import java.io.Serializable;
  import java.io.IOException;
  
  import javax.jms.JMSException;
  
  /**
   * This is used to keep a log of commited transactions.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $
   */
  public class SpyTxLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private ObjectIntegrityLog transactionLog;
    private int liveTransactionCount = 0;
    private Object counterLock = new Object();
  
        /////////////////////////////////////////////////////////////////////
        // Constructors
        /////////////////////////////////////////////////////////////////////
        public SpyTxLog(String fileName) throws JMSException {
                try {
                        transactionLog = new ObjectIntegrityLog(fileName);
                } catch (IOException e) {
                        throwJMSException("Could not open the queue's tranaction log: 
" + fileName, e);
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
                        transactionLog.close();
                } catch ( IOException e ) {
                        throwJMSException("Could not close the queue's tranaction 
log.",e);
                }
        }
  
        synchronized public void delete() throws JMSException {
                try{
                        transactionLog.delete();
                } catch ( IOException e ) {
                        throwJMSException("Could not delete the queue's tranaction 
log.",e);
                }
        }
  
  
        public void createTx() throws JMSException {
      synchronized(counterLock){
        ++liveTransactionCount;
      }
        }
  
        public boolean completed() throws JMSException {
      synchronized(counterLock){
        return (liveTransactionCount == 0);
      }
        }
  
        synchronized public void commitTx(Long id) throws JMSException {
  
                try {
                        transactionLog.add(id);
                        transactionLog.commit();
        synchronized(counterLock){
          --liveTransactionCount;
        }
                } catch ( IOException e ) {
                        throwJMSException("Could not create a new transaction.",e);
                }
  
        }
  
        synchronized public void restore(java.util.TreeSet result) throws JMSException 
{
                try {
                        result.addAll(transactionLog.toTreeSet());
                } catch ( Exception e ) {
                        throwJMSException("Could not restore the transaction log.",e);
                }
        }
  
        public void rollbackTx(Long txId) throws JMSException {
      synchronized(counterLock){
        --liveTransactionCount;
      }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Private Methods
        /////////////////////////////////////////////////////////////////////
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  
  }
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to