User: pkendall
  Date: 01/05/15 00:15:45

  Added:       src/main/org/jbossmq/persistence PersistenceManager.java
  Log:
  Taken & modified from server package to form new persistence logfile based
  persistence package.
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jbossmq/persistence/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.persistence;
  
  import javax.jms.JMSException;
  
  import java.net.URL;
  import java.util.HashMap;
  import java.util.TreeSet;
  import java.util.Iterator;
  import java.util.LinkedList;
  
  import org.jbossmq.xml.XElement;
  import org.jbossmq.server.JMSServer;
  import org.jbossmq.server.JMSDestination;
  import org.jbossmq.SpyMessage;
  import org.jbossmq.SpyDestination;
  import org.jbossmq.SpyDistributedConnection;
  
  /**
   *    This class manages all persistence related services.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   *
   *    @version $Revision: 1.1 $
   */
  public class PersistenceManager extends org.jbossmq.server.PersistenceManager {
  
        // The server this persistence manager is providing service for
        JMSServer server;
        // The configuration data for the manager.
        XElement configElement;
        // The directory where persistence data should be stored
        URL dataDirectory;
        // Log file used to store commited transactions.
        SpyTxLog spyTxLog;
        // Maps SpyDestinations to SpyMessageLogs
        HashMap messageLogs = new HashMap();
  
        static class LogInfo {
                SpyMessageLog log;
                SpyDestination destination;
                String queueId;
  
                LogInfo(SpyMessageLog log, SpyDestination destination, String queueId) 
{
                        this.log=log;
                        this.destination=destination;
                        this.queueId=queueId;
                }
  
        }
  
        /**
         * PersistenceManager constructor.
         */
        public PersistenceManager(JMSServer server, XElement configElement) throws 
javax.jms.JMSException {
  
                try {
  
                        this.server = server;
                        this.configElement = configElement;
  
                        URL configFile = 
getClass().getClassLoader().getResource("jbossmq.xml");
                        dataDirectory = new URL(configFile, 
configElement.getField("DataDirectory"));
                        URL txLogFile = new URL(dataDirectory, "transactions.dat");
                        spyTxLog = new SpyTxLog(txLogFile.getFile());
  
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public Long createPersistentTx() throws javax.jms.JMSException {
                return spyTxLog.createTx();
        }
  
        public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
                spyTxLog.commitTx(txId);
        }
  
        public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
                spyTxLog.rollbackTx(txId);
        }
  
        public void restore() throws javax.jms.JMSException {
  
                TreeSet commitedTXs = spyTxLog.restore();
                HashMap clone;
                synchronized (messageLogs) {
                        clone = (HashMap) messageLogs.clone();
                }
  
                Iterator iter = clone.values().iterator();
                while (iter.hasNext()) {
  
                        LogInfo logInfo = (LogInfo)iter.next();
  
                        JMSDestination q = 
server.getJMSDestination(logInfo.destination);
  
                        SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
  
                        //TODO: make sure this lock is good enough
                        synchronized (q) {
                                for (int i = 0; i < rebuild.length; i++) {
                                        q.restoreMessage(rebuild[i], logInfo.queueId);
                                }
                        }
                }
  
        }
  
        public void initQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  
                try {
  
                        URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
                        SpyMessageLog log = new SpyMessageLog(logFile.getFile());
  
                        LogInfo info = new LogInfo(log, dest, queueId);
  
                        messageLogs.put(""+dest+"-"+queueId, info);
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void destroyQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  
                try {
  
                        URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
                        java.io.File file = new java.io.File(logFile.getFile());
  
                        SpyMessageLog log = 
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
                        if( log == null )
                                throw new JMSException("The persistence log was never 
initialized");
                        log.close();
  
                        file.delete();
  
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
                        newE.setLinkedException(e);
                        throw newE;
                }
  
        }
  
        public void add(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  
                LogInfo logInfo;
  
                synchronized (messageLogs) {
                        logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
                logInfo.log.add(message, txId);
  
        }
  
        public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  
                LogInfo logInfo;
  
                synchronized (messageLogs) {
                        logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
                }
  
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  
                logInfo.log.remove(message, txId);
  
        }
  }
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to