User: pkendall
  Date: 01/08/08 18:18:28

  Modified:    src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java
                        PersistenceManager.java
                        PersistenceManagerMBean.java SpyMessageLog.java
                        SpyTxLog.java
  Removed:     src/main/org/jbossmq/pm/rollinglogged
                        ObjectIntegrityLog.java
  Log:
  Major updates (especially to topics).
  Speed improvements.
  Make JVM IL work (by using a singleton JMSServer).
  Message Listeners re-implemented using client-side thread.
  
  Revision  Changes    Path
  1.2       +173 -169  jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java
  
  Index: IntegrityLog.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- IntegrityLog.java 2001/07/31 20:17:52     1.1
  +++ IntegrityLog.java 2001/08/09 01:18:28     1.2
  @@ -6,150 +6,101 @@
    */
   package org.jbossmq.pm.rollinglogged;
   
  -import java.io.RandomAccessFile;
  -import java.io.OutputStream;
  -import java.io.InputStream;
  -import java.io.IOException;
  -import java.io.File;
  +import java.io.*;
   
  -
  +import org.jbossmq.*;
   /**
    * This class is used to create a log file which which will will garantee
  - * it's integrety up to the last commit point.
  - *
  - * The InputStream returned by getInputStream() will read
  - * data placed into the log with the OutputStream returned by
  - * getOutputStream().  The EOF for the InputStream is the
  - * last commited point of the OutputStream.
  + * it's integrety up to the last commit point.  An optimised version of the
  + * integrityLog in the logged persistence.
    *
  - * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $
  + * @author: David Maplesden ([EMAIL PROTECTED])
  + * @version $Revision: 1.2 $
    */
  -public class IntegrityLog {
  +public class IntegrityLog{
   
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
  -     private static final int HEADER_SIZE=16; // in bytes
  -
  -     // Header related stuff
  -     private long firstRecordPos;
  -     private long nextRecordPos;
  -     private byte headerBytes[]=new byte[HEADER_SIZE];
   
        private RandomAccessFile raf;
     private File f;
  -
  -     private LogOutputStream outputStream;
  -     private LogInputStream inputStream;
  -
  -
  -     /////////////////////////////////////////////////////////////////////
  -     // Helper Inner Classes
  -     /////////////////////////////////////////////////////////////////////
  -     class LogInputStream extends InputStream {
  -
  -             boolean closed = false;
  -             long inputPos = 0;
  +  private ObjectOutput objectOutput;
   
  -             public long getFilePointer() {
  -                     return inputPos;
  +  ////////////////////////////////////////////////////////////
  +  //  Helper Inner classes.                                 //
  +  ////////////////////////////////////////////////////////////
  +
  +  class MyOutputStream extends OutputStream{
  +    public void close() throws IOException{
  +      flush();
  +    }
  +             public void write(int b) throws IOException {
  +                     raf.write( (byte)b );
                }
  -             public void close() throws IOException {
  -                     super.close();
  -                     closed = true;
  +             public void write(byte bytes[], int off, int len) throws IOException {
  +                     raf.write( bytes, off, len );
                }
  -             public int read() throws IOException {
  -                     inputPos = Math.max(inputPos, firstRecordPos);
  -                     int rc = IntegrityLog.this.read(inputPos);
  -                     if( rc >= 0 )
  -                             inputPos ++;
  -                     return rc;
  +     }
  +
  +     class MyObjectOutputStream extends ObjectOutputStream {
  +             MyObjectOutputStream(OutputStream os) throws IOException {
  +                     super(os);
                }
  -             public int read(byte bytes[], int off, int len) throws IOException {
  -                     inputPos = Math.max(inputPos, firstRecordPos);
  -                     int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
  -                     if( rc >= 0 )
  -                             inputPos += rc;
  -                     return rc;
  +             protected void writeStreamHeader() {
                }
        }
   
  -     class LogOutputStream extends OutputStream {
  -             boolean closed = false;
  -             public long getFilePointer() {
  -                     return nextRecordPos;
  -             }
  -             public void close() throws IOException {
  -                     super.close();
  -                     closed = true;
  +     class MyObjectInputStream extends ObjectInputStream {
  +             MyObjectInputStream(InputStream is) throws IOException {
  +                     super(is);
                }
  -             public void write(int b) throws IOException {
  -                     IntegrityLog.this.write( (byte)b );
  +             protected void readStreamHeader() {
                }
  -             public void write(byte bytes[], int off, int len) throws IOException {
  -                     IntegrityLog.this.write( bytes, off, len );
  +     }
  +
  +  class MyInputStream extends InputStream{
  +    public void close() throws IOException{
  +    }
  +             public int read() throws IOException {
  +                     return raf.read( );
                }
  +             public int read(byte bytes[], int off, int len) throws IOException {
  +                     return raf.read( bytes, off, len );
  +             }
  +  }
  +
  +     class MessageAddedRecord implements Serializable {
  +             long messageId;
  +             boolean isTransacted;
  +             long transactionId;
  +             SpyMessage message;
  +             private final static long serialVersionUID = 235726945332013954L;
        }
   
  +     class MessageRemovedRecord implements Serializable {
  +             boolean isTransacted;
  +             long transactionId;
  +             long messageId;
  +             private final static long serialVersionUID = 235726945332013955L;
  +     }
   
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public IntegrityLog(String fileName) throws IOException {
                f = new File(fileName);
  -             boolean exists = f.isFile();
  -
                raf = new RandomAccessFile(f, "rw");
  -             if( exists ) {
  -                     loadHeader();
  -             } else {
  -                     initHeader();
  -             }
  +    this.objectOutput = new MyObjectOutputStream(new MyOutputStream());
  +             seekEnd();
        }
   
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
  -     public LogInputStream getInputStream() {
  -             if ( inputStream==null || inputStream.closed ) {
  -                     inputStream = new LogInputStream();
  -             }
  -             return inputStream;
  -     }
  -
  -     public LogOutputStream getOutputStream() throws IOException {
  -             if ( outputStream==null || outputStream.closed ) {
  -                     outputStream = new LogOutputStream();
  -             }
  -             return outputStream;
  -     }
   
        public void commit() throws IOException {
  -
  -             headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
  -             headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
  -             headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
  -             headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
  -             headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
  -             headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
  -             headerBytes[6] = (byte)((firstRecordPos >>>  8) & 0xFF);
  -             headerBytes[7] = (byte)((firstRecordPos >>>  0) & 0xFF);
  -             headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
  -             headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
  -             headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
  -             headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
  -             headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
  -             headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
  -             headerBytes[14] =(byte)((nextRecordPos >>>  8) & 0xFF);
  -             headerBytes[15] =(byte)((nextRecordPos >>>  0) & 0xFF);
  -
  -             raf.seek(0);
  -             raf.write(headerBytes);
  -     }
  -
  -     public void rollback() throws IOException {
  -             loadHeader();
  +    //raf.getFD().sync();
        }
   
     public void delete() throws IOException {
  @@ -160,79 +111,132 @@
                raf.close();
                raf = null;
        }
  -
  -     /////////////////////////////////////////////////////////////////////
  -     // Private Methods
  -     /////////////////////////////////////////////////////////////////////
  -     private long getBytesLeft(long offset) {
  -
  -             return nextRecordPos-offset;
  -
  -     }
  -
  -     private void initHeader() throws IOException {
  -
  -             firstRecordPos = HEADER_SIZE;
  -             nextRecordPos = HEADER_SIZE;
  -
  -             commit();
  -     }
  -
  -     private void loadHeader() throws IOException {
   
  -             raf.seek(0);
  -             firstRecordPos = raf.readLong();
  -             nextRecordPos = raf.readLong();
  -
  -     }
  -
  -     private int read(long offset) throws IOException {
  -
  -             if( offset >= nextRecordPos )
  -                     return -1;
  -
  -             if( raf.getFilePointer() != offset ) {
  -                     raf.seek(offset);
  -             }
  -
  -             int rc = raf.read();
  -             return rc;
  -
  -     }
  -
  -     private int read(long offset, byte bytes[], int off, int len) throws 
IOException {
  -
  -             if( offset >= nextRecordPos )
  -                     return -1;
  +  protected static final byte TX = 0;
  +  protected static final byte ADD = 1;
  +  protected static final byte REMOVE = 2;
  +
  +  public synchronized void addTx(org.jbossmq.pm.Tx tx) throws IOException{
  +    raf.writeByte(TX);
  +    raf.writeLong(tx.longValue());
  +  }
   
  -             len = (int)Math.min(len, getBytesLeft(offset));
  +  public synchronized void add(long messageID, boolean isTransacted, long txId, 
SpyMessage message)throws IOException{
  +    raf.writeByte(ADD);
  +    raf.writeLong(messageID);
  +    raf.writeBoolean(isTransacted);
  +    raf.writeLong(txId);
  +    SpyMessage.writeMessage(message,objectOutput);
  +    objectOutput.flush();
  +  }
   
  -             if( raf.getFilePointer() != offset ) {
  -                     raf.seek(offset);
  -             }
  +  public synchronized void remove(long messageID, boolean isTransacted, long 
txId)throws IOException{
  +    raf.writeByte(REMOVE);
  +    raf.writeLong(messageID);
  +    raf.writeBoolean(isTransacted);
  +    raf.writeLong(txId);
  +  }
   
  -             int rc = raf.read(bytes, off, len);
  -             return rc;
  +  public void skipNextEntry(ObjectInput in) throws IOException{
  +    byte type = raf.readByte();
  +    switch(type){
  +      case TX:
  +        raf.readLong();
  +        return;
  +      case ADD:
  +        raf.readLong();
  +        raf.readBoolean();
  +        raf.readLong();
  +        SpyMessage.readMessage(in);
  +        return;
  +      case REMOVE:
  +        raf.readLong();
  +        raf.readBoolean();
  +        raf.readLong();
  +        return;
  +      default:
  +        throw new java.io.IOException("Error in log file format.");
  +    }
  +  }
   
  -     }
  +  public java.util.LinkedList toIndex() throws IOException{
  +    raf.seek(0);
  +    long length = raf.length();
  +    long pos = 0;
  +    ObjectInput in = new MyObjectInputStream(new MyInputStream());
  +    java.util.LinkedList ll = new java.util.LinkedList();
  +    try{
  +      while(pos < length){
  +        ll.add(readNextEntry(in));
  +        pos = raf.getFilePointer();
  +      }
  +    }catch(EOFException e){
  +      //incomplete record
  +    }
  +    in.close();
  +    raf.seek(pos);
   
  -     private void write(byte []record, int off, int len) throws IOException {
  -             if( raf.getFilePointer() != nextRecordPos ) {
  -                     raf.seek(nextRecordPos);
  -             }
  +    return ll;
  +  }
   
  -             raf.write(record, off, len);
  -             nextRecordPos+=len;
  -     }
  +  public java.util.TreeSet toTreeSet() throws IOException{
  +    raf.seek(0);
  +    long length = raf.length();
  +    long pos = 0;
  +    ObjectInput in = new MyObjectInputStream(new MyInputStream());
  +    java.util.TreeSet ll = new java.util.TreeSet();
  +    try{
  +      while(pos < length){
  +        ll.add(readNextEntry(in));
  +        pos = raf.getFilePointer();
  +      }
  +    }catch(EOFException e){
  +      //incomplete record
  +    }
  +    in.close();
  +    raf.seek(pos);
   
  -     private void write(byte b) throws IOException {
  +    return ll;
  +  }
   
  -             if( raf.getFilePointer() != nextRecordPos ) {
  -                     raf.seek(nextRecordPos);
  -             }
  +  public Object readNextEntry(ObjectInput in) throws IOException{
  +    byte type = raf.readByte();
  +    switch(type){
  +      case TX:
  +        return new org.jbossmq.pm.Tx(raf.readLong());
  +      case ADD:
  +        MessageAddedRecord add = new MessageAddedRecord();
  +        add.messageId = raf.readLong();
  +        add.isTransacted = raf.readBoolean();
  +        add.transactionId = raf.readLong();
  +        add.message = SpyMessage.readMessage(in);
  +        return add;
  +      case REMOVE:
  +        MessageRemovedRecord remove = new MessageRemovedRecord();
  +        remove.messageId = raf.readLong();
  +        remove.isTransacted = raf.readBoolean();
  +        remove.transactionId = raf.readLong();
  +        return remove;
  +      default:
  +        throw new java.io.IOException("Error in log file format.");
  +    }
  +  }
   
  -             raf.write(b);
  -             nextRecordPos++;
  +     private void seekEnd() throws IOException {
  +    raf.seek(0);
  +    long length = raf.length();
  +    long pos = 0;
  +    ObjectInput in = new MyObjectInputStream(new MyInputStream());
  +    try{
  +      while(pos < length){
  +        skipNextEntry(in);
  +        pos = raf.getFilePointer();
  +      }
  +    }catch(EOFException e){
  +      //incomplete record, must have been due to program failure during write.
  +    }
  +    in.close();
  +    raf.seek(pos);
        }
   
   }
  
  
  
  1.3       +68 -19    
jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- PersistenceManager.java   2001/08/01 20:22:35     1.2
  +++ PersistenceManager.java   2001/08/09 01:18:28     1.3
  @@ -14,8 +14,8 @@
   import org.jbossmq.server.JMSDestination;
   import org.jbossmq.SpyMessage;
   import org.jbossmq.SpyDestination;
  +import org.jbossmq.SpyJMSException;
   
  -
   import javax.naming.InitialContext;
   import org.jbossmq.pm.TxManager;
   import org.jboss.util.ServiceMBeanSupport;
  @@ -25,9 +25,9 @@
   /**
    *   This class manages all persistence related services.
    *
  - *   @author David Maplesden
  + *   @author David Maplesden ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class PersistenceManager
        extends ServiceMBeanSupport
  @@ -71,28 +71,69 @@
        }
   
     static class TxInfo {
  -    Long txId;
  +    org.jbossmq.pm.Tx txId;
       LinkedList addMessages = new LinkedList();
       LinkedList ackMessages = new LinkedList();
       SpyTxLog log;
  -    TxInfo(Long txId,SpyTxLog log){
  +    TxInfo(org.jbossmq.pm.Tx txId,SpyTxLog log){
         this.txId = txId;
         this.log = log;
       }
     }
  +
  +  protected java.util.ArrayList listPool = new java.util.ArrayList();
  +  protected java.util.ArrayList txPool = new java.util.ArrayList();
  +
  +  protected static int MAX_POOL_SIZE = 50;
  +
  +  protected TxInfo getTxInfo(org.jbossmq.pm.Tx txId, SpyTxLog txLog){
  +    if(listPool.isEmpty()){
  +      return new TxInfo(txId,txLog);
  +    }else{
  +      TxInfo info = (TxInfo)listPool.remove(listPool.size()-1);
  +      info.txId = txId;
  +      info.log = txLog;
  +      return info;
  +    }
  +  }
  +
  +  protected void releaseTxInfo(TxInfo list){
  +    if(listPool.size() < MAX_POOL_SIZE){
  +      list.ackMessages.clear();
  +      list.addMessages.clear();
  +      listPool.add(list);
  +    }
  +  }
   
  -     public Long createPersistentTx() throws javax.jms.JMSException {
  -    Long txId = null;
  +  protected org.jbossmq.pm.Tx getTx(long value){
  +    if(txPool.isEmpty()){
  +      return new org.jbossmq.pm.Tx(value);
  +    }else{
  +      org.jbossmq.pm.Tx tx = (org.jbossmq.pm.Tx)txPool.remove(listPool.size()-1);
  +      tx.setValue(value);
  +      return tx;
  +    }
  +  }
  +
  +  protected void releaseTx(org.jbossmq.pm.Tx tx){
  +    if(txPool.size() < MAX_POOL_SIZE){
  +      txPool.add(tx);
  +    }
  +  }
  +
  +     public org.jbossmq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
  +    org.jbossmq.pm.Tx txId = null;
       SpyTxLog txLog = currentTxLog;
       synchronized(transToTxLogs){
  -      txId = new Long(++nextTxId);
  -      transToTxLogs.put(txId,new TxInfo(txId,txLog));
  +      txId = getTx(++nextTxId);
  +      transToTxLogs.put(txId,getTxInfo(txId,txLog));
       }
       txLog.createTx();
       return txId;
        }
   
  -     public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
  +     public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
  +//System.out.println("Committing TX "+Long.toHexString(txId.longValue()));
       TxInfo info = null;
       LinkedList messagesToDelete = null;
       synchronized(transToTxLogs){
  @@ -101,10 +142,14 @@
       }
       deleteMessages(messagesToDelete);
                info.log.commitTx(txId);
  +    synchronized(transToTxLogs){
  +      releaseTx(txId);
  +      releaseTxInfo(info);
  +    }
       checkCleanup(info.log);
        }
   
  -     public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
  +     public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
       TxInfo info = null;
       LinkedList messagesToDelete = null;
       synchronized(transToTxLogs){
  @@ -113,6 +158,10 @@
       }
       deleteMessages(messagesToDelete);
                info.log.rollbackTx(txId);
  +    synchronized(transToTxLogs){
  +      releaseTx(txId);
  +      releaseTxInfo(info);
  +    }
       checkCleanup(info.log);
        }
   
  @@ -160,7 +209,7 @@
         }
         checkCleanup(oldLog);
       }catch(java.net.MalformedURLException e){
  -      JMSException jme = new JMSException("Error rolling over logs to new files.");
  +      JMSException jme = new SpyJMSException("Error rolling over logs to new 
files.");
         jme.setLinkedException(e);
         throw jme;
       }
  @@ -224,7 +273,7 @@
           log = (SpyMessageLog)logs.remove(key);
         }
         if( log == null )
  -        throw new JMSException("The persistence log was never initialized");
  +        throw new SpyJMSException("The persistence log was never initialized");
         log.close();
         log.delete();
   
  @@ -257,8 +306,8 @@
   
        }
   
  -     public void add(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  -
  +     public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
  +//System.out.println("Add message "+Long.toHexString(message.messageId)+" in trans 
"+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination());
                LogInfo logInfo;
   
       SpyTxLog txLog = null;
  @@ -275,7 +324,7 @@
         logs = (HashMap)messageLogs.get(txLog);
                }
       synchronized(logs){
  -                     logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
  +                     logInfo = (LogInfo) 
logs.get(message.getJMSDestination().toString());
                }
   
                if (logInfo == null)
  @@ -295,14 +344,15 @@
       checkRollOver();
        }
   
  -     public void remove(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  +     public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) 
throws javax.jms.JMSException {
  +//System.out.println("Removing message "+Long.toHexString(message.messageId)+" in 
trans "+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination());
   
                LogInfo logInfo;
   
       SpyTxLog txLog = ((LogInfo)message.persistData).txLog;
                synchronized (messageLogs) {
         HashMap logs = (HashMap)messageLogs.get(txLog);
  -                     logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
  +                     logInfo = (LogInfo) 
logs.get(message.getJMSDestination().toString());
                }
   
                if (logInfo == null)
  @@ -354,7 +404,6 @@
       if(DEBUG) System.out.println("Using new rolling logged persistence manager.");
   
                URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
  -
                dataDirURL = new URL(configFile, dataDirectory);
   
            //Get an InitialContext
  @@ -386,7 +435,7 @@
       }
   
       if(!commitedTxs.isEmpty())
  -      nextTxId = ((Long)commitedTxs.last()).longValue();
  +      nextTxId = ((org.jbossmq.pm.Tx)commitedTxs.last()).longValue();
   
       for(int i=0;i<dataFiles.length;++i){
         String name = dataFiles[i].getName();
  
  
  
  1.2       +0 -0      
jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java
  
  Index: PersistenceManagerMBean.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- PersistenceManagerMBean.java      2001/07/31 20:17:52     1.1
  +++ PersistenceManagerMBean.java      2001/08/09 01:18:28     1.2
  @@ -13,7 +13,7 @@
    *
    *   @see <related>
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public interface PersistenceManagerMBean
      extends org.jboss.util.ServiceMBean
  
  
  
  1.2       +23 -50    jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyMessageLog.java        2001/07/31 20:17:52     1.1
  +++ SpyMessageLog.java        2001/08/09 01:18:28     1.2
  @@ -6,6 +6,8 @@
    */
   package org.jbossmq.pm.rollinglogged;
   
  +import org.jbossmq.SpyJMSException;
  +
   import java.io.IOException;
   import java.io.Serializable;
   import javax.jms.JMSException;
  @@ -18,42 +20,21 @@
    * provider failure.  Integrety is kept by the use of an ObjectIntegrityLog.
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class SpyMessageLog {
   
        /////////////////////////////////////////////////////////////////////
        // Attributes
  -     /////////////////////////////////////////////////////////////////////
  -     private ObjectIntegrityLog transactionLog;
  -     private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
  -     private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
  -
        /////////////////////////////////////////////////////////////////////
  -     // Helper Inner Classes
  -     /////////////////////////////////////////////////////////////////////
  -     static class MessageAddedRecord implements Serializable {
  -             long messageId;
  -             boolean isTransacted;
  -             long transactionId;
  -             SpyMessage message;
  -             private final static long serialVersionUID = 235726945332013954L;
  -     }
  +     private IntegrityLog transactionLog;
   
  -     static class MessageRemovedRecord implements Serializable {
  -             boolean isTransacted;
  -             long transactionId;
  -             long messageId;
  -             private final static long serialVersionUID = 235726945332013955L;
  -     }
  -
  -
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public SpyMessageLog(String fileName) throws JMSException {
                try {
  -                     transactionLog = new ObjectIntegrityLog(fileName);
  +                     transactionLog = new IntegrityLog(fileName);
                } catch ( IOException e ) {
                        throwJMSException("Could not open the queue's tranaction log: 
"+fileName,e);
                }
  @@ -79,19 +60,14 @@
                }
     }
   
  -     synchronized public void add( SpyMessage message, Long transactionId ) throws 
JMSException {
  +     synchronized public void add( SpyMessage message, org.jbossmq.pm.Tx 
transactionId ) throws JMSException {
                try{
   
  -                     messageAddedRecord.message = message;
  -                     messageAddedRecord.messageId = message.messageId;
                        if( transactionId == null )     {
  -                             messageAddedRecord.isTransacted = false;
  +        transactionLog.add(message.messageId,false,-1,message);
                        } else {
  -                             messageAddedRecord.isTransacted = true;
  -                             messageAddedRecord.transactionId = 
transactionId.longValue();
  +        
transactionLog.add(message.messageId,true,transactionId.longValue(),message);
                        }
  -
  -                     transactionLog.add(messageAddedRecord);
                        transactionLog.commit();
   
                } catch ( IOException e ) {
  @@ -100,17 +76,14 @@
   
        }
   
  -     synchronized public void remove( SpyMessage message, Long transactionId ) 
throws JMSException {
  +     synchronized public void remove( SpyMessage message, org.jbossmq.pm.Tx 
transactionId ) throws JMSException {
                try{
   
  -                     messageRemovedRecord.messageId = message.messageId;
                        if( transactionId == null ) {
  -                             messageRemovedRecord.isTransacted = false;
  +        transactionLog.remove(message.messageId,false,-1);
                        } else {
  -                             messageRemovedRecord.isTransacted = true;
  -                             messageRemovedRecord.transactionId = 
transactionId.longValue();
  +        transactionLog.remove(message.messageId,true,transactionId.longValue());
                        }
  -                     transactionLog.add(messageRemovedRecord);
                        transactionLog.commit();
   
                } catch ( IOException e ) {
  @@ -124,29 +97,29 @@
                java.util.HashMap messageIndex = new java.util.HashMap();
   
                try {
  -                     ObjectIntegrityLog.IndexItem objects[] = 
transactionLog.toIndex();
  +                     java.util.LinkedList objects = transactionLog.toIndex();
   
  -                     for( int i=0; i < objects.length; i++ ) {
  +                     for(java.util.Iterator it = objects.iterator();it.hasNext(); ) 
{
   
  -                             Object o = objects[i].record;
  -                             if( o instanceof MessageAddedRecord ) {
  +                             Object o = it.next();
  +                             if( o instanceof IntegrityLog.MessageAddedRecord ) {
   
  -                                     MessageAddedRecord r = (MessageAddedRecord)o;
  +                                     IntegrityLog.MessageAddedRecord r = 
(IntegrityLog.MessageAddedRecord)o;
                                        r.message.messageId = r.messageId;
   
  -                                     if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
  +                                     if( r.isTransacted && !commited.contains(new 
org.jbossmq.pm.Tx(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
                                                // commited... so drop this message
                                                continue;
                                        }
   
  -                                     messageIndex.put( new Long(r.messageId), 
objects[i]);
  +                                     messageIndex.put( new Long(r.messageId), o);
   
  -                             } else if( o instanceof MessageRemovedRecord ) {
  +                             } else if( o instanceof 
IntegrityLog.MessageRemovedRecord ) {
   
  -                                     MessageRemovedRecord r = 
(MessageRemovedRecord)o;
  +                                     IntegrityLog.MessageRemovedRecord r = 
(IntegrityLog.MessageRemovedRecord)o;
   
  -                                     if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
  +                                     if( r.isTransacted && !commited.contains(new 
org.jbossmq.pm.Tx(r.transactionId)) ) {
                                                // the TX this message was part of was 
not
                                                // commited... so drop this message
                                                continue;
  @@ -158,20 +131,20 @@
   
                        }
                } catch ( Exception e ) {
  +//      e.printStackTrace();
                        throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
                }
   
                SpyMessage rc[] = new SpyMessage[messageIndex.size()];
                java.util.Iterator iter = messageIndex.values().iterator();
                for( int i=0; iter.hasNext(); i++ ) {
  -                     ObjectIntegrityLog.IndexItem item = 
(ObjectIntegrityLog.IndexItem)iter.next();
  -                     rc[i] = ((MessageAddedRecord)item.record).message;
  +                     rc[i] = ((IntegrityLog.MessageAddedRecord)iter.next()).message;
                }
                return rc;
        }
   
        private void throwJMSException(String message, Exception e) throws 
JMSException {
  -             JMSException newE = new JMSException(message);
  +             JMSException newE = new SpyJMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  
  
  
  1.2       +8 -6      jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java
  
  Index: SpyTxLog.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyTxLog.java     2001/07/31 20:17:52     1.1
  +++ SpyTxLog.java     2001/08/09 01:18:28     1.2
  @@ -6,6 +6,8 @@
    */
   package org.jbossmq.pm.rollinglogged;
   
  +import org.jbossmq.SpyJMSException;
  +
   import java.io.Serializable;
   import java.io.IOException;
   
  @@ -15,14 +17,14 @@
    * This is used to keep a log of commited transactions.
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class SpyTxLog {
   
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
  -     private ObjectIntegrityLog transactionLog;
  +     private IntegrityLog transactionLog;
     private int liveTransactionCount = 0;
     private Object counterLock = new Object();
   
  @@ -31,7 +33,7 @@
        /////////////////////////////////////////////////////////////////////
        public SpyTxLog(String fileName) throws JMSException {
                try {
  -                     transactionLog = new ObjectIntegrityLog(fileName);
  +                     transactionLog = new IntegrityLog(fileName);
                } catch (IOException e) {
                        throwJMSException("Could not open the queue's tranaction log: 
" + fileName, e);
                }
  @@ -69,10 +71,10 @@
       }
        }
   
  -     synchronized public void commitTx(Long id) throws JMSException {
  +     synchronized public void commitTx(org.jbossmq.pm.Tx id) throws JMSException {
   
                try {
  -                     transactionLog.add(id);
  +                     transactionLog.addTx(id);
                        transactionLog.commit();
         synchronized(counterLock){
           --liveTransactionCount;
  @@ -91,7 +93,7 @@
                }
        }
   
  -     public void rollbackTx(Long txId) throws JMSException {
  +     public void rollbackTx(org.jbossmq.pm.Tx txId) throws JMSException {
       synchronized(counterLock){
         --liveTransactionCount;
       }
  @@ -101,7 +103,7 @@
        // Private Methods
        /////////////////////////////////////////////////////////////////////
        private void throwJMSException(String message, Exception e) throws 
JMSException {
  -             JMSException newE = new JMSException(message);
  +             JMSException newE = new SpyJMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to