User: pkendall
  Date: 01/05/15 00:18:01

  Added:       src/main/org/jbossmq/filepersistence MessageLog.java
                        PersistenceManager.java TxLog.java
  Log:
  New file based persistence package.  Each persistent message is a single file.
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jbossmq/filepersistence/MessageLog.java
  
  Index: MessageLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.filepersistence;
  
  import java.io.IOException;
  import java.io.Serializable;
  import java.io.FileOutputStream;
  import java.io.ObjectOutputStream;
  import java.io.FileInputStream;
  import java.io.ObjectInputStream;
  import java.io.File;
  
  import javax.jms.JMSException;
  
  import org.jbossmq.SpyMessage;
  
  /**
   * This is used to keep SpyMessages on the disk and is used reconstruct the
   * queue in case of provider failure.
   *
   * @author: Paul Kendall ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $
   */
  public class MessageLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private File queueName;
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public MessageLog(String fileName) throws JMSException {
      queueName = new File(fileName);
      queueName.mkdir();
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        public void close() throws JMSException {
        }
  
        public void add( SpyMessage message, Long transactionId ) throws JMSException {
                try{
        FileOutputStream file = new FileOutputStream(new File(queueName, 
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg"));
        ObjectOutputStream out = new ObjectOutputStream(file);
        out.writeObject(transactionId);
        out.writeLong(message.messageId);
        out.writeObject(message);
        out.flush();
        out.close();
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the tranaction log.",e);
                }
        }
  
        public void remove( SpyMessage message, Long transactionId ) throws 
JMSException {
      File file = new File(queueName, 
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg");
  
                // I know this looks silly!  But sometimes (but not often) M$ systems 
fail
                // on the first delete
      if(!file.delete()) {
        Thread.yield();
        if( file.exists() ) {
          if(!file.delete())
            System.out.println("Failed to delete file: "+file.getAbsolutePath());
        }
        else {
          System.out.println("File was deleted, but delete() failed for: 
"+file.getAbsolutePath());
        }
      }
        }
  
        public SpyMessage[] restore(java.util.TreeSet comittingTXs) throws 
JMSException {
  
                java.util.TreeMap messageIndex = new java.util.TreeMap();
  
                try {
        File[] files = queueName.listFiles();
        for( int i=0 ; i<files.length ; i++ ) {
          ObjectInputStream in = new ObjectInputStream(new FileInputStream(files[i]));
          Long txId = (Long)in.readObject();
          if( comittingTXs!=null && comittingTXs.contains(txId) ) {
            in.close();
            if(!files[i].delete()) {
              Thread.sleep(10);
              if(!files[i].delete()){
                System.out.println("Could not delete file: 
"+files[i].getAbsolutePath());
              }
            }
          }
          else {
            long msgId = in.readLong();
            SpyMessage message = (SpyMessage)in.readObject();
            in.close();
            message.messageId = msgId;
            messageIndex.put(new Long(msgId), message);
          }
        }
                } catch ( Exception e ) {
                        throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
                }
  
                SpyMessage rc[] = new SpyMessage[messageIndex.size()];
                java.util.Iterator iter = messageIndex.values().iterator();
                for( int i=0; iter.hasNext(); i++ )
                        rc[i] = (SpyMessage)iter.next();
                return rc;
        }
  
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  
  }
  
  
  1.1                  
jbossmq/src/main/org/jbossmq/filepersistence/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.filepersistence;
  
  import javax.jms.JMSException;
  
  import java.net.URL;
  import java.util.HashMap;
  import java.util.TreeSet;
  import java.util.Iterator;
  import java.util.LinkedList;
  
  import org.jbossmq.xml.XElement;
  import org.jbossmq.server.JMSServer;
  import org.jbossmq.server.JMSDestination;
  import org.jbossmq.SpyMessage;
  import org.jbossmq.SpyDestination;
  import org.jbossmq.SpyDistributedConnection;
  
  /**
   *    This class manages all persistence related services for file based
   *  persistence.
   *
   *    @author Paul Kendall ([EMAIL PROTECTED])
   *
   *    @version $Revision: 1.1 $
   */
  public class PersistenceManager extends org.jbossmq.server.PersistenceManager {
  
        // The server this persistence manager is providing service for
        JMSServer server;
        // The configuration data for the manager.
        XElement configElement;
        // The directory where persistence data should be stored
        URL dataDirectory;
        // Log file used to store commited transactions.
        TxLog txLog;
        // Maps SpyDestinations to SpyMessageLogs
        HashMap messageLogs = new HashMap();
        // Maps (Long)txIds to LinkedList of AddFile tasks
        HashMap transactedTasks = new HashMap();
  
        static class LogInfo {
                MessageLog log;
                SpyDestination destination;
                String queueId;
  
                LogInfo(MessageLog log, SpyDestination destination, String queueId) {
                        this.log=log;
                        this.destination=destination;
                        this.queueId=queueId;
                }
        }
  
        /**
         * PersistenceManager constructor.
         */
        public PersistenceManager(JMSServer server, XElement configElement) throws 
javax.jms.JMSException {
  
                try {
  
                        this.server = server;
                        this.configElement = configElement;
  
                        URL configFile = 
getClass().getClassLoader().getResource("jbossmq.xml");
                        dataDirectory = new URL(configFile, 
configElement.getField("DataDirectory"));
                        URL txLogFile = new URL(dataDirectory, "transactions.dat");
                        txLog = new TxLog(txLogFile.getFile());
  
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void restore() throws javax.jms.JMSException {
  
                TreeSet committingTXs = txLog.restore();
                HashMap clone;
                synchronized (messageLogs) {
                        clone = (HashMap) messageLogs.clone();
                }
  
                Iterator iter = clone.values().iterator();
                while (iter.hasNext()) {
  
                        LogInfo logInfo = (LogInfo)iter.next();
  
                        JMSDestination q = 
server.getJMSDestination(logInfo.destination);
  
                        SpyMessage rebuild[] = logInfo.log.restore(committingTXs);
  
                        //TODO: make sure this lock is good enough
                        synchronized (q) {
                                for (int i = 0; i < rebuild.length; i++) {
                                        q.restoreMessage(rebuild[i], logInfo.queueId);
                                }
                        }
                }
  
        }
  
        public Long createPersistentTx() throws javax.jms.JMSException {
                Long txId = txLog.createTx();
                synchronized (transactedTasks) {
        transactedTasks.put(txId, new LinkedList());
                }
                return txId;
        }
  
        public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
  
                LinkedList transacted;
                synchronized (transactedTasks) {
                        transacted = (LinkedList)transactedTasks.remove(txId);
                }
      synchronized(transacted){
        Iterator iter = transacted.iterator();
        while( iter.hasNext() ) {
          Transaction task = (Transaction)iter.next();
          task.commit();
        }
      }
  
                txLog.commitTx(txId);
        }
  
        public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
  
                LinkedList transacted;
      synchronized(transactedTasks){
                        transacted = (LinkedList)transactedTasks.remove(txId);
                }
      synchronized(transacted){
        Iterator iter = transacted.iterator();
        while( iter.hasNext() ) {
          Transaction task = (Transaction)iter.next();
          task.rollback();
        }
      }
  
                txLog.rollbackTx(txId);
        }
  
        public void initQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  
                try {
  
                        URL logDir = new URL(dataDirectory, 
dest.toString()+"-"+queueId);
                        MessageLog log = new MessageLog(logDir.getFile());
  
                        LogInfo info = new LogInfo(log, dest, queueId);
  
        synchronized(messageLogs){
                          messageLogs.put(""+dest+"-"+queueId, info);
        }
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void destroyQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  
                try {
  
                        URL logDir = new URL(dataDirectory, 
dest.toString()+"-"+queueId);
                        java.io.File file = new java.io.File(logDir.getFile());
  
        LogInfo logInfo;
        synchronized(messageLogs){
                          logInfo = (LogInfo)messageLogs.remove(""+dest+"-"+queueId);
        }
                        if( logInfo == null )
                                throw new JMSException("The persistence log was never 
initialized");
  
                        logInfo.log.close();
                        file.delete();
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void add(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  
                LogInfo logInfo;
  
                synchronized (messageLogs) {
                        logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
          logInfo.log.add(message, txId);
  
      if( txId != null ) {
        LinkedList tasks;
        synchronized( transactedTasks ) {
          tasks = (LinkedList)transactedTasks.get(txId);
        }
        if( tasks == null )
          throw new javax.jms.JMSException("Transaction is not active 5.");
        synchronized(tasks){
          tasks.addLast(new Transaction(true, logInfo, message, txId));
        }
      }
  
        }
  
        public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  
                LogInfo logInfo;
  
                synchronized (messageLogs) {
                        logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
      if( txId == null )
                  logInfo.log.remove(message, txId);
      else {
        LinkedList tasks;
        synchronized (transactedTasks) {
          tasks = (LinkedList)transactedTasks.get(txId);
        }
        if( tasks == null )
          throw new javax.jms.JMSException("Transaction is not active 6.");
        synchronized(tasks){
          tasks.addLast(new Transaction(false, logInfo, message, txId));
        }
      }
  
        }
  
    class Transaction {
      private LogInfo logInfo;
      private SpyMessage message;
      private Long txId;
      private boolean add;
      public Transaction(boolean add, LogInfo logInfo, SpyMessage message, Long txId) {
        this.add = add;
        this.logInfo = logInfo;
        this.message = message;
        this.txId = txId;
      }
      public void commit() throws JMSException {
        if(!add)
          logInfo.log.remove(message, txId);
      }
      public void rollback() throws JMSException {
        if(add)
                    logInfo.log.remove(message, txId);
      }
    }
  
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/filepersistence/TxLog.java
  
  Index: TxLog.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.filepersistence;
  
  import java.io.Serializable;
  import java.io.IOException;
  import java.io.RandomAccessFile;
  import java.util.TreeSet;
  import java.util.Iterator;
  
  import javax.jms.JMSException;
  
  /**
   * This is used to keep a log of active transactions.
   * It is used to rollback transactions when the system restarts.
   *
   * @author: Paul Kendall ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $
   */
  public class TxLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private RandomAccessFile transactionLog;
    private TreeSet transactions = new TreeSet();
        private long nextTransactionId = Long.MIN_VALUE;
  
        /////////////////////////////////////////////////////////////////////
        // Constructors
        /////////////////////////////////////////////////////////////////////
        public TxLog(String fileName) throws JMSException {
                try {
                        transactionLog = new RandomAccessFile(fileName, "rw");
                } catch (IOException e) {
                        throwJMSException("Could not open tranaction log: " + 
fileName, e);
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
                        transactionLog.close();
                } catch ( IOException e ) {
                        throwJMSException("Could not close tranaction log.",e);
                }
        }
  
        synchronized public Long createTx() throws JMSException {
                Long id = new Long(nextTransactionId++);
      transactions.add(id);
                try {
        transactionLog.writeLong(nextTransactionId);
        transactionLog.writeInt(transactions.size());
        for(Iterator iter = transactions.iterator() ; iter.hasNext() ;) {
          transactionLog.writeLong(((Long)iter.next()).longValue());
        }
        transactionLog.seek(0);
                } catch ( IOException e ) {
                        throwJMSException("Could not write transaction log on 
commit.",e);
                }
      return id;
        }
  
        synchronized public void commitTx(Long txId) throws JMSException {
                try {
        transactions.remove(txId);
        transactionLog.writeLong(nextTransactionId);
        transactionLog.writeInt(transactions.size());
        for(Iterator iter = transactions.iterator() ; iter.hasNext() ;) {
          transactionLog.writeLong(((Long)iter.next()).longValue());
        }
        transactionLog.seek(0);
                } catch ( IOException e ) {
                        throwJMSException("Could not write transaction log on 
commit.",e);
                }
        }
  
        synchronized public void rollbackTx(Long txId) throws JMSException {
                try {
        transactions.remove(txId);
        transactionLog.writeLong(nextTransactionId);
        transactionLog.writeInt(transactions.size());
        for(Iterator iter = transactions.iterator() ; iter.hasNext() ;) {
          transactionLog.writeLong(((Long)iter.next()).longValue());
        }
        transactionLog.seek(0);
                } catch ( IOException e ) {
                        throwJMSException("Could not write transaction log on 
rollback.",e);
                }
        }
  
        synchronized public java.util.TreeSet restore() throws JMSException {
                java.util.TreeSet items=null;
  
      try {
        if( transactionLog.length() != 0 ) {
          nextTransactionId = transactionLog.readLong();
          int size = transactionLog.readInt();
          if( size > 0 ) {
            items = new java.util.TreeSet();
            for(int i=0 ; i<size ; i++) {
              long txId = transactionLog.readLong();
              items.add(new Long(txId));
            }
          }
        }
        transactionLog.seek(0);
        transactionLog.setLength(0);
      } catch ( Exception e ) {
        throwJMSException("Could not restore the transaction log.",e);
      }
  
                return items;
        }
  
        /////////////////////////////////////////////////////////////////////
        // Private Methods
        /////////////////////////////////////////////////////////////////////
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  
  }
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to