User: pkendall
  Date: 01/08/08 18:18:28

  Modified:    src/main/org/jbossmq/pm/file MessageLog.java
                        PersistenceManager.java
  Log:
  Major updates (especially to topics).
  Speed improvements.
  Make JVM IL work (by using a singleton JMSServer).
  Message Listeners re-implemented using client-side thread.
  
  Revision  Changes    Path
  1.2       +151 -48   jbossmq/src/main/org/jbossmq/pm/file/MessageLog.java
  
  Index: MessageLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/file/MessageLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MessageLog.java   2001/07/11 02:52:16     1.1
  +++ MessageLog.java   2001/08/09 01:18:28     1.2
  @@ -16,18 +16,29 @@
   
   import javax.jms.JMSException;
   
  -import org.jbossmq.SpyMessage;
  +import org.jbossmq.*;
   
   /**
    * This is used to keep SpyMessages on the disk and is used reconstruct the
    * queue in case of provider failure.
    *
    * @author: Paul Kendall ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class MessageLog {
   
        /////////////////////////////////////////////////////////////////////
  +     // Constants
  +     /////////////////////////////////////////////////////////////////////
  +  protected static final byte OBJECT_MESS = 3;
  +  protected static final byte BYTES_MESS = 4;
  +  protected static final byte MAP_MESS = 5;
  +  protected static final byte TEXT_MESS = 6;
  +  protected static final byte STREAM_MESS = 7;
  +  protected static final byte ENCAP_MESS = 8;
  +  protected static final byte SPY_MESS = 9;
  +
  +     /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private File queueName;
  @@ -36,10 +47,97 @@
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public MessageLog(String fileName) throws JMSException {
  -     queueName = new File(fileName);
  -     queueName.mkdir();
  +    queueName = new File(fileName);
  +    queueName.mkdir();
        }
   
  +     /////////////////////////////////////////////////////////////////////
  +     // Utility Methods
  +     /////////////////////////////////////////////////////////////////////
  +  protected void delete(File file) throws IOException{
  +             // I know this looks silly!  But sometimes (but not often) M$ systems 
fail
  +             // on the first delete
  +    if(!file.delete()) {
  +      Thread.yield();
  +      if( file.exists() ) {
  +        if(!file.delete())
  +          System.out.println("Failed to delete file: "+file.getAbsolutePath());
  +      } else {
  +        System.out.println("File was deleted, but delete() failed for: 
"+file.getAbsolutePath());
  +      }
  +    }
  +  }
  +
  +  protected void rename(File from, File to) throws IOException{
  +             // I know this looks silly!  But sometimes (but not often) M$ systems 
fail
  +             // on the first rename (as above)
  +    if(!from.renameTo(to)) {
  +      Thread.yield();
  +      if( from.exists() ) {
  +        if(!from.renameTo(to))
  +          System.out.println("Rename of file "+from.getAbsolutePath()+" to 
"+to.getAbsolutePath()+" failed.");
  +      } else {
  +        System.out.println("Rename of file "+from.getAbsolutePath()+" to 
"+to.getAbsolutePath()+" failed but from no longer exists?");
  +      }
  +    }
  +  }
  +
  +  protected void writeMessageToFile(SpyMessage message,File file) throws 
IOException{
  +    ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(file));
  +    out.writeLong(message.messageId);
  +    if(message instanceof SpyEncapsulatedMessage){
  +      out.writeByte(ENCAP_MESS);
  +    }else if(message instanceof SpyObjectMessage){
  +      out.writeByte(OBJECT_MESS);
  +    }else if(message instanceof SpyBytesMessage){
  +      out.writeByte(BYTES_MESS);
  +    }else if(message instanceof SpyMapMessage){
  +      out.writeByte(MAP_MESS);
  +    }else if(message instanceof SpyTextMessage){
  +      out.writeByte(TEXT_MESS);
  +    }else if(message instanceof SpyStreamMessage){
  +      out.writeByte(STREAM_MESS);
  +    }else{
  +      out.writeByte(SPY_MESS);
  +    }
  +    message.writeExternal(out);
  +    out.flush();
  +    out.close();
  +  }
  +
  +  protected void restoreMessageFromFile(java.util.TreeMap store, File file) throws 
Exception{
  +    ObjectInputStream in = new ObjectInputStream(new FileInputStream(file));
  +    long msgId = in.readLong();
  +    SpyMessage message = null;
  +    byte type = in.readByte();
  +    switch(type){
  +      case OBJECT_MESS:
  +        message = new SpyObjectMessage();
  +        break;
  +      case BYTES_MESS:
  +        message = new SpyBytesMessage();
  +        break;
  +      case MAP_MESS:
  +        message = new SpyMapMessage();
  +        break;
  +      case STREAM_MESS:
  +        message = new SpyStreamMessage();
  +        break;
  +      case TEXT_MESS:
  +        message = new SpyTextMessage();
  +        break;
  +      case ENCAP_MESS:
  +        message = new SpyEncapsulatedMessage();
  +        break;
  +      default:
  +        message = new SpyMessage();
  +    }
  +    message.readExternal(in);
  +    in.close();
  +    message.messageId = msgId;
  +    message.persistData = file;
  +    store.put(new Long(msgId), message);
  +  }
   
        /////////////////////////////////////////////////////////////////////
        // Public Methods
  @@ -47,63 +145,68 @@
        public void close() throws JMSException {
        }
   
  -     public void add( SpyMessage message, Long transactionId ) throws JMSException {
  +     public void add( SpyMessage message, org.jbossmq.pm.Tx transactionId ) throws 
JMSException {
                try{
  -       FileOutputStream file = new FileOutputStream(new File(queueName, 
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg"));
  -       ObjectOutputStream out = new ObjectOutputStream(file);
  -       out.writeObject(transactionId);
  -       out.writeLong(message.messageId);
  -       out.writeObject(message);
  -       out.flush();
  -       out.close();
  +      File f;
  +      if(transactionId == null)
  +        f = new File(queueName, message.getJMSMessageID());
  +      else
  +        f = new File(queueName, message.getJMSMessageID()+"."+transactionId);
  +      writeMessageToFile(message,f);
  +      message.persistData = f;
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the tranaction log.",e);
                }
        }
   
  -     public void remove( SpyMessage message, Long transactionId ) throws 
JMSException {
  -     File file = new File(queueName, 
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg");
  +  public void finishAdd( SpyMessage message, org.jbossmq.pm.Tx transactionId ) 
throws JMSException {
  +  }
   
  -             // I know this looks silly!  But sometimes (but not often) M$ systems 
fail
  -             // on the first delete
  -     if(!file.delete()) {
  -       Thread.yield();
  -       if( file.exists() ) {
  -             if(!file.delete())
  -               System.out.println("Failed to delete file: "+file.getAbsolutePath());
  -       }
  -       else {
  -             System.out.println("File was deleted, but delete() failed for: 
"+file.getAbsolutePath());
  -       }
  -     }
  +  public void undoAdd( SpyMessage message, org.jbossmq.pm.Tx transactionId ) throws 
JMSException {
  +             try{
  +      File file = (File)message.persistData;
  +      delete(file);
  +             } catch ( IOException e ) {
  +                     throwJMSException("Could not write to the tranaction log.",e);
  +             }
  +  }
  +
  +     public void remove( SpyMessage message, org.jbossmq.pm.Tx transactionId ) 
throws JMSException {
        }
  +
  +  public void finishRemove( SpyMessage message, org.jbossmq.pm.Tx transactionId ) 
throws JMSException {
  +             try{
  +      File file = (File)message.persistData;
  +      delete(file);
  +             } catch ( IOException e ) {
  +                     throwJMSException("Could not write to the tranaction log.",e);
  +             }
  +  }
   
  -     public SpyMessage[] restore(java.util.TreeSet comittingTXs) throws 
JMSException {
  +  public void undoRemove( SpyMessage message, org.jbossmq.pm.Tx transactionId ) 
throws JMSException {
  +  }
   
  +     public SpyMessage[] restore(java.util.TreeSet rollBackTXs) throws JMSException 
{
  +    //use sorted map to get queue order right.
                java.util.TreeMap messageIndex = new java.util.TreeMap();
   
                try {
  -       File[] files = queueName.listFiles();
  -       for( int i=0 ; i<files.length ; i++ ) {
  -             ObjectInputStream in = new ObjectInputStream(new 
FileInputStream(files[i]));
  -             Long txId = (Long)in.readObject();
  -             if( comittingTXs!=null && comittingTXs.contains(txId) ) {
  -               in.close();
  -               if(!files[i].delete()) {
  -                     Thread.sleep(10);
  -                     if(!files[i].delete()){
  -                       System.out.println("Could not delete file: 
"+files[i].getAbsolutePath());
  -                     }
  -               }
  -             }
  -             else {
  -               long msgId = in.readLong();
  -               SpyMessage message = (SpyMessage)in.readObject();
  -               in.close();
  -               message.messageId = msgId;
  -               messageIndex.put(new Long(msgId), message);
  -             }
  -       }
  +      File[] files = queueName.listFiles();
  +      for( int i=0 ; i<files.length ; i++ ) {
  +        String fileName = files[i].getName();
  +        int extIndex = fileName.indexOf(".");
  +        if(extIndex < 0){
  +          //non transacted message so simply restore
  +          restoreMessageFromFile(messageIndex,files[i]);
  +        }else{
  +          //test if message from a transaction that is being rolled back.
  +          Long tx = new Long(Long.parseLong(fileName.substring(extIndex+1)));
  +          if(rollBackTXs.contains(tx))
  +            delete(files[i]);
  +          else
  +            restoreMessageFromFile(messageIndex,files[i]);
  +        }
  +      }
                } catch ( Exception e ) {
                        throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
                }
  @@ -116,7 +219,7 @@
        }
   
        private void throwJMSException(String message, Exception e) throws 
JMSException {
  -             JMSException newE = new JMSException(message);
  +             JMSException newE = new SpyJMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  
  
  
  1.3       +343 -212  jbossmq/src/main/org/jbossmq/pm/file/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/file/PersistenceManager.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- PersistenceManager.java   2001/07/28 00:33:38     1.2
  +++ PersistenceManager.java   2001/08/09 01:18:28     1.3
  @@ -13,20 +13,18 @@
   import java.util.TreeSet;
   import java.util.Iterator;
   import java.util.LinkedList;
  +import java.io.File;
  +import java.io.IOException;
   
  -import org.jbossmq.xml.XElement;
   import org.jbossmq.server.JMSServer;
   import org.jbossmq.server.JMSDestination;
   import org.jbossmq.SpyMessage;
   import org.jbossmq.SpyDestination;
  +import org.jbossmq.SpyJMSException;
   
  -
  -import javax.naming.InitialContext;
   import org.jbossmq.pm.TxManager;
   import org.jboss.util.ServiceMBeanSupport;
   import javax.management.*;
  -import java.io.Serializable;
  -import org.jbossmq.ConnectionToken;
   
   /**
    *   This class manages all persistence related services for file based
  @@ -34,151 +32,182 @@
    *
    *   @author Paul Kendall ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
  -public class PersistenceManager extends ServiceMBeanSupport implements 
org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, MBeanRegistration, 
Serializable {
  -
  +public class PersistenceManager extends ServiceMBeanSupport
  +  implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, 
MBeanRegistration {
   
  -
  -     transient private String dataDirectory;
  -     // Log file used to store commited transactions.
  -     transient TxLog txLog;
  +     // The directory where persistence data should be stored
  +  String dataDirectory;
  +     URL dataDirURL;
  +  File dataDirFile;
  +  //tx manager
  +  org.jbossmq.pm.TxManager txManager;
        // Maps SpyDestinations to SpyMessageLogs
  -     transient HashMap messageLogs= new HashMap();
  +     HashMap messageLogs = new HashMap();
        // Maps (Long)txIds to LinkedList of AddFile tasks
  -     transient HashMap transactedTasks= new HashMap();
  +     HashMap transactedTasks = new HashMap();
   
  -     static class LogInfo {
  +  class TxInfo {
  +    File txf;
  +    java.io.RandomAccessFile raf;
  +    LinkedList tasks = new LinkedList();
  +
  +    TxInfo() throws JMSException{
  +    }
  +
  +    void setFile(File f) throws JMSException{
  +      txf = f;
  +      try{
  +        raf = new java.io.RandomAccessFile(txf,"rw");
  +      }catch(IOException e){
  +        JMSException jmse = new SpyJMSException("IO Error create raf for txinfo.");
  +        jmse.setLinkedException(e);
  +        throw jmse;
  +      }
  +    }
  +  }
  +
  +  protected static final int MAX_POOL_SIZE = 50;
  +  protected java.util.ArrayList txPool = new java.util.ArrayList();
  +
  +  protected TxInfo getTxInfo(File f)throws JMSException{
  +    TxInfo info;
  +    synchronized(txPool){
  +      if(txPool.isEmpty()){
  +        info = new TxInfo();
  +      }else{
  +        info = (TxInfo)txPool.remove(txPool.size()-1);
  +      }
  +    }
  +    info.setFile(f);
  +    return info;
  +  }
  +
  +  protected void releaseTxInfo(TxInfo info){
  +    synchronized(txPool){
  +      if(txPool.size() < MAX_POOL_SIZE){
  +        info.tasks.clear();
  +        txPool.add(info);
  +      }
  +    }
  +  }
  +
  +     class LogInfo {
                MessageLog log;
                SpyDestination destination;
   
                LogInfo(MessageLog log, SpyDestination destination) {
  -                     this.log= log;
  -                     this.destination= destination;
  -             }
  -     }
  -
  -
  -
  -
  -
  -     public Long createPersistentTx() throws javax.jms.JMSException {
  -             Long txId = txLog.createTx();
  -             synchronized (transactedTasks) {
  -       transactedTasks.put(txId, new LinkedList());
  -             }
  -             return txId;
  -     }
  -
  -     public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
  -
  -             LinkedList transacted;
  -             synchronized (transactedTasks) {
  -                     transacted = (LinkedList)transactedTasks.remove(txId);
  -             }
  -     synchronized(transacted){
  -       Iterator iter = transacted.iterator();
  -       while( iter.hasNext() ) {
  -             Transaction task = (Transaction)iter.next();
  -             task.commit();
  -       }
  -     }
  -
  -             txLog.commitTx(txId);
  -     }
  -
  -     public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
  -
  -             LinkedList transacted;
  -     synchronized(transactedTasks){
  -                     transacted = (LinkedList)transactedTasks.remove(txId);
  -             }
  -     synchronized(transacted){
  -       Iterator iter = transacted.iterator();
  -       while( iter.hasNext() ) {
  -             Transaction task = (Transaction)iter.next();
  -             task.rollback();
  -       }
  -     }
  -
  -             txLog.rollbackTx(txId);
  -     }
  -
  -
  -
  -
  -
  -
  -
  -
  -
  -     class Transaction {
  -             private LogInfo logInfo;
  -             private SpyMessage message;
  -             private Long txId;
  -             private boolean add;
  -             public Transaction(boolean add, LogInfo logInfo, SpyMessage message, 
Long txId) {
  -                     this.add= add;
  -                     this.logInfo= logInfo;
  -                     this.message= message;
  -                     this.txId= txId;
  +                     this.log=log;
  +                     this.destination=destination;
                }
  -             public void commit() throws JMSException {
  -                     if (!add)
  -                             logInfo.log.remove(message, txId);
  -             }
  -             public void rollback() throws JMSException {
  -                     if (add)
  -                             logInfo.log.remove(message, txId);
  -             }
        }
   
  -     // The directory where persistence data should be stored
  -     transient URL dataDirURL;
  -     transient TxManager txManager;
  -
        /**
         * PersistenceManager constructor.
         */
        public PersistenceManager() throws javax.jms.JMSException {
                txManager = new TxManager( this );
        }
  -
  -/**
  - * Insert the method's description here.
  - * Creation date: (6/27/2001 12:53:12 AM)
  - * @return java.lang.String
  - */
  -public java.lang.String getDataDirectory() {
  -     return dataDirectory;
  -}
  -
  -     public String getName() {
  -             return "JBossMQ-PersistenceManager";
  -     }
  -
  -/**
  - * getTxManager method comment.
  - */
  -public org.jbossmq.pm.TxManager getTxManager() {
  -     return txManager;
  -}
   
  -     public void initService() throws java.lang.Exception {
  +  public String getName() {
  +    return "JBossMQ-PersistenceManager";
  +  }
  +
  +  public void setDataDirectory(java.lang.String newDataDirectory) {
  +    dataDirectory = newDataDirectory;
  +  }
  +
  +  public java.lang.String getDataDirectory() {
  +    return dataDirectory;
  +  }
  +
  +  public org.jbossmq.pm.TxManager getTxManager() {
  +    return txManager;
  +  }
   
  +     public void initService() throws Exception {
                URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
  -             
                dataDirURL = new URL(configFile, dataDirectory);
  -             URL txLogFile = new URL(dataDirURL, "transactions.dat");
  -             txLog = new TxLog(txLogFile.getFile());
  -                                             
  -             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );                
  +    dataDirFile = new File(dataDirURL.getFile());
  +             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );
                server.setPersistenceManager(this);
        }
   
  +     public void startService() throws Exception {
  +             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );
  +             restore(server);
  +     }
  +
  +  protected boolean testRollBackTx(Long tx, java.util.ArrayList removingMessages) 
throws IOException{
  +    //checks to see if this tx was in the middle of committing.
  +    //If it was finish commit and return false else return true.
  +             HashMap clone;
  +             synchronized (messageLogs) {
  +                     clone = (HashMap) messageLogs.clone();
  +             }
  +
  +    java.util.ArrayList files = new java.util.ArrayList();
  +    boolean foundAll = true;
  +    for(int i=0;i<removingMessages.size();i++){
  +      String fileName = removingMessages.get(i)+"."+tx;
  +      boolean found = false;
  +      for(Iterator it = clone.keySet().iterator();!found && it.hasNext();){
  +        String dirName = (String)it.next();
  +        File dir = new File(dataDirFile,dirName);
  +        File [] messageFiles = dir.listFiles();
  +        for(int j=0;j<messageFiles.length;++j){
  +          if(messageFiles[j].getName().equals(fileName)){
  +            found = true;
  +            files.add(messageFiles[j]);
  +            break;
  +          }
  +        }
  +      }
  +      if(!found){
  +        foundAll = false;
  +      }
  +    }
  +    if(!foundAll){
  +      //tx being committed so need to finish it by deleting files.
  +      for(int i=0;i<files.size();++i){
  +        File f = (File)files.get(i);
  +        if(!f.delete()){
  +          Thread.yield();
  +          //try again
  +          if(!f.delete()){
  +            throw new IOException("Could not delete file "+f.getAbsolutePath());
  +          }
  +        }
  +      }
  +      return false;
  +    }
  +    return true;
  +  }
  +
        public void restore(JMSServer server) throws javax.jms.JMSException {
  +    //reconstruct TXs
  +             TreeSet txs = new TreeSet();
  +    File [] transactFiles = dataDirFile.listFiles();
  +    for(int i=0;i<transactFiles.length;i++){
  +      try{
  +        Long tx = new Long(Long.parseLong(transactFiles[i].getName()));
  +        java.util.ArrayList removingMessages = readTxFile(transactFiles[i]);
  +        if(testRollBackTx(tx,removingMessages))
  +          txs.add(tx);
  +      }catch(NumberFormatException e){
  +        System.out.println("Ignoring invalid transaction record file 
"+transactFiles[i].getAbsolutePath());
  +        transactFiles[i] = null;
  +      }catch(IOException e){
  +        JMSException jmse = new SpyJMSException("IO Error when restoring.");
  +        jmse.setLinkedException(e);
  +        throw jmse;
  +      }
  +    }
   
  -             TreeSet committingTXs = txLog.restore();
  +    if(!txs.isEmpty())
  +      this.tidcounter = ((Long)txs.last()).longValue()+1;
  +
                HashMap clone;
                synchronized (messageLogs) {
                        clone = (HashMap) messageLogs.clone();
  @@ -186,85 +215,55 @@
   
                Iterator iter = clone.values().iterator();
                while (iter.hasNext()) {
  -
                        LogInfo logInfo = (LogInfo)iter.next();
  -
                        JMSDestination q = 
server.getJMSDestination(logInfo.destination);
  -
  -                     SpyMessage rebuild[] = logInfo.log.restore(committingTXs);
  -
  +                     SpyMessage rebuild[] = logInfo.log.restore(txs);
                        //TODO: make sure this lock is good enough
                        synchronized (q) {
                                for (int i = 0; i < rebuild.length; i++) {
  +          if(logInfo.destination instanceof org.jbossmq.SpyTopic)
  +            rebuild[i].durableSubscriberID = 
((org.jbossmq.SpyTopic)logInfo.destination).getDurableSubscriptionID();
                                        q.restoreMessage(rebuild[i]);
                                }
                        }
                }
   
  +    //all txs now committed or rolled back so delete tx files
  +    for(int i=0;i<transactFiles.length;i++){
  +      if(transactFiles[i] != null)
  +        deleteTxFile(transactFiles[i]);
  +    }
        }
  -
  -/**
  - * Insert the method's description here.
  - * Creation date: (6/27/2001 12:53:12 AM)
  - * @param newDataDirectory java.lang.String
  - */
  -public void setDataDirectory(java.lang.String newDataDirectory) {
  -     dataDirectory = newDataDirectory;
  -}
  -
  -     public void startService() throws Exception {
  -
  -             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );                
  -             restore(server);
  -             
  -     }
  -
  -     public void add(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  -
  -             LogInfo logInfo;
  -
  -             synchronized (messageLogs) {
  -                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination());
  -             }
   
  -             if (logInfo == null) {
  -             category.debug("Destination was not initialized : "+ 
message.getJMSDestination());
  -                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  +     public void initQueue( SpyDestination dest ) throws javax.jms.JMSException {
  +             try {
  +                     URL logDir = new URL(dataDirURL, dest.toString());
  +                     MessageLog log = new MessageLog(logDir.getFile());
  +                     LogInfo info = new LogInfo(log, dest);
  +      synchronized(messageLogs){
  +                       messageLogs.put(dest.toString(), info);
  +      }
  +             } catch (javax.jms.JMSException e) {
  +                     throw e;
  +             } catch (Exception e) {
  +                     javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  +                     newE.setLinkedException(e);
  +                     throw newE;
                }
  -
  -       logInfo.log.add(message, txId);
  -
  -     if( txId != null ) {
  -       LinkedList tasks;
  -       synchronized( transactedTasks ) {
  -             tasks = (LinkedList)transactedTasks.get(txId);
  -       }
  -       if( tasks == null )
  -             throw new javax.jms.JMSException("Transaction is not active 5.");
  -       synchronized(tasks){
  -             tasks.addLast(new Transaction(true, logInfo, message, txId));
  -       }
  -     }
  -
        }
  -
  -     public void destroyQueue( SpyDestination dest) throws javax.jms.JMSException {
   
  +     public void destroyQueue( SpyDestination dest ) throws javax.jms.JMSException {
                try {
  -
                        URL logDir = new URL(dataDirURL, dest.toString());
                        java.io.File file = new java.io.File(logDir.getFile());
  -
  -       LogInfo logInfo;
  -       synchronized(messageLogs){
  -                       logInfo = (LogInfo)messageLogs.remove(""+dest);
  -       }
  +      LogInfo logInfo;
  +      synchronized(messageLogs){
  +                       logInfo = (LogInfo)messageLogs.remove(dest.toString());
  +      }
                        if( logInfo == null )
                                throw new JMSException("The persistence log was never 
initialized");
  -
                        logInfo.log.close();
                        file.delete();
  -
                } catch (javax.jms.JMSException e) {
                        throw e;
                } catch (Exception e) {
  @@ -272,57 +271,189 @@
                        newE.setLinkedException(e);
                        throw newE;
                }
  -
        }
  -
  -     public void initQueue(SpyDestination dest) throws javax.jms.JMSException {
   
  -         try {
  +  protected long tidcounter = Long.MIN_VALUE;
   
  -             URL logDir= new URL(dataDirURL, dest.toString());
  -             MessageLog log= new MessageLog(logDir.getFile());
  +     public org.jbossmq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
  +    org.jbossmq.pm.Tx txId = null;
  +             synchronized (transactedTasks) {
  +      txId = new org.jbossmq.pm.Tx(tidcounter++);
  +      transactedTasks.put(txId, getTxInfo(createTxFile(txId)));
  +             }
  +             return txId;
  +     }
   
  -             LogInfo info= new LogInfo(log, dest);
  +  protected File createTxFile(org.jbossmq.pm.Tx txId) throws javax.jms.JMSException 
{
  +    try{
  +      File file = new File(dataDirFile,txId.toString());
  +      if(!file.createNewFile())
  +        throw new javax.jms.JMSException("Error creating tx file.");
  +      return file;
  +    }catch(IOException e){
  +      JMSException newE = new SpyJMSException("Unable to create committing 
transaction record.");
  +      newE.setLinkedException(e);
  +      throw newE;
  +    }
  +  }
  +
  +  protected void deleteTxFile(File file) throws javax.jms.JMSException {
  +    if(!file.delete()){
  +      Thread.yield();
  +      if(file.exists() && !file.delete()){
  +        throw new javax.jms.JMSException("Unable to delete committing transaction 
record.");
  +      }
  +    }
  +  }
  +
  +  protected java.util.ArrayList readTxFile(File file) throws javax.jms.JMSException 
{
  +    try{
  +      java.util.ArrayList result = new java.util.ArrayList();
  +      java.io.RandomAccessFile raf = new java.io.RandomAccessFile(file,"r");
  +      try{
  +        while(true){
  +          result.add(raf.readUTF());
  +        }
  +      }catch(java.io.EOFException e){
  +      }
  +      raf.close();
  +      return result;
  +    }catch(IOException e){
  +      JMSException newE = new SpyJMSException("Unable to read committing 
transaction record.");
  +      newE.setLinkedException(e);
  +      throw newE;
  +    }
  +  }
   
  -             category.debug("Initializing persistence for destination: "+ dest);
  -             synchronized (messageLogs) {
  -                 messageLogs.put("" + dest, info);
  -             }
  +     public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
  +             TxInfo info;
  +             synchronized (transactedTasks) {
  +                     info = (TxInfo)transactedTasks.remove(txId);
  +             }
  +    //ensure record of tx exists
  +    try{
  +      info.raf.close();
  +    }catch(IOException e){
  +      JMSException jmse = new SpyJMSException("IO Error when closing raf for tx.");
  +      jmse.setLinkedException(e);
  +      throw jmse;
  +    }
  +    synchronized(info.tasks){
  +      Iterator iter = info.tasks.iterator();
  +      while( iter.hasNext() ) {
  +        Transaction task = (Transaction)iter.next();
  +        task.commit();
  +      }
  +    }
  +    deleteTxFile(info.txf);
  +    releaseTxInfo(info);
  +     }
   
  -         } catch (javax.jms.JMSException e) {
  -             throw e;
  -         } catch (Exception e) {
  -             javax.jms.JMSException newE= new javax.jms.JMSException("Invalid 
configuration.");
  -             newE.setLinkedException(e);
  -             throw newE;
  -         }
  +     public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
  +             TxInfo info;
  +             synchronized (transactedTasks) {
  +                     info = (TxInfo)transactedTasks.remove(txId);
  +             }
  +    //ensure record of tx exists
  +    try{
  +      info.raf.close();
  +    }catch(IOException e){
  +      JMSException jmse = new SpyJMSException("IO Error when closing raf for tx.");
  +      jmse.setLinkedException(e);
  +      throw jmse;
  +    }
  +    synchronized(info.tasks){
  +      Iterator iter = info.tasks.iterator();
  +      while( iter.hasNext() ) {
  +        Transaction task = (Transaction)iter.next();
  +        task.rollback();
  +      }
  +    }
  +    deleteTxFile(info.txf);
  +    releaseTxInfo(info);
  +     }
   
  +     public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
  +             LogInfo logInfo;
  +             synchronized (messageLogs) {
  +                     logInfo = (LogInfo) 
messageLogs.get(message.getJMSDestination().toString());
  +             }
  +             if (logInfo == null)
  +                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  +       logInfo.log.add(message, txId);
  +    if( txId == null){
  +      logInfo.log.finishAdd(message, txId);
  +    }else {
  +      TxInfo info;
  +      synchronized( transactedTasks ) {
  +        info = (TxInfo)transactedTasks.get(txId);
  +      }
  +      if( info == null )
  +        throw new javax.jms.JMSException("Transaction is not active 5.");
  +      synchronized(info.tasks){
  +        info.tasks.addLast(new Transaction(true, logInfo, message, txId));
  +      }
  +    }
        }
   
  -     public void remove(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  +     public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) 
throws javax.jms.JMSException {
   
                LogInfo logInfo;
   
                synchronized (messageLogs) {
  -                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination());
  +                     logInfo = (LogInfo) 
messageLogs.get(message.getJMSDestination().toString());
                }
   
                if (logInfo == null)
                        throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
   
  -     if( txId == null )
  -               logInfo.log.remove(message, txId);
  -     else {
  -       LinkedList tasks;
  -       synchronized (transactedTasks) {
  -             tasks = (LinkedList)transactedTasks.get(txId);
  -       }
  -       if( tasks == null )
  -             throw new javax.jms.JMSException("Transaction is not active 6.");
  -       synchronized(tasks){
  -             tasks.addLast(new Transaction(false, logInfo, message, txId));
  -       }
  -     }
  +    logInfo.log.remove(message, txId);
  +    if( txId == null )
  +               logInfo.log.finishRemove(message, txId);
  +    else {
  +      TxInfo info;
  +      synchronized( transactedTasks ) {
  +        info = (TxInfo)transactedTasks.get(txId);
  +      }
  +      if( info == null )
  +        throw new javax.jms.JMSException("Transaction is not active 6.");
  +      try{
  +        info.raf.writeUTF(message.getJMSMessageID());
  +      }catch(IOException e){
  +        JMSException jmse = new SpyJMSException("IO Error when recording remove in 
txs raf.");
  +        jmse.setLinkedException(e);
  +        throw jmse;
  +      }
  +      synchronized(info.tasks){
  +        info.tasks.addLast(new Transaction(false, logInfo, message, txId));
  +      }
  +    }
   
        }
  +
  +  class Transaction {
  +    private LogInfo logInfo;
  +    private SpyMessage message;
  +    private org.jbossmq.pm.Tx txId;
  +    private boolean add;
  +    public Transaction(boolean add, LogInfo logInfo, SpyMessage message, 
org.jbossmq.pm.Tx txId) {
  +      this.add = add;
  +      this.logInfo = logInfo;
  +      this.message = message;
  +      this.txId = txId;
  +    }
  +    public void commit() throws JMSException {
  +      if(add)
  +        logInfo.log.finishAdd(message, txId);
  +      else
  +        logInfo.log.finishRemove(message, txId);
  +    }
  +    public void rollback() throws JMSException {
  +      if(add)
  +                 logInfo.log.undoAdd(message, txId);
  +      else
  +        logInfo.log.undoRemove(message, txId);
  +    }
  +  }
  +
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to