WOW!!!  now this gets me exited!  A rolling log file based persistence 
manager!  This should give use the performance edge of using a log files for 
persistence but removes all the scalablity problems of a linear logger.

Good Work Paul!

Hiram

>From: Paul Kendall <[EMAIL PROTECTED]>
>Reply-To: [EMAIL PROTECTED]
>To: [EMAIL PROTECTED]
>Subject: [JBoss-dev] CVS update: 
>jbossmq/src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java 
>PersistenceManager.java PersistenceManagerMBean.java SpyMessageLog.java 
>SpyTxLog.java ObjectIntegrityLog.java
>Date: Wed, 08 Aug 2001 18:18:28 -0700
>
>   User: pkendall
>   Date: 01/08/08 18:18:28
>
>   Modified:    src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java
>                         PersistenceManager.java
>                         PersistenceManagerMBean.java SpyMessageLog.java
>                         SpyTxLog.java
>   Removed:     src/main/org/jbossmq/pm/rollinglogged
>                         ObjectIntegrityLog.java
>   Log:
>   Major updates (especially to topics).
>   Speed improvements.
>   Make JVM IL work (by using a singleton JMSServer).
>   Message Listeners re-implemented using client-side thread.
>
>   Revision  Changes    Path
>   1.2       +173 -169  
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java
>
>   Index: IntegrityLog.java
>   ===================================================================
>   RCS file: 
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java,v
>   retrieving revision 1.1
>   retrieving revision 1.2
>   diff -u -r1.1 -r1.2
>   --- IntegrityLog.java       2001/07/31 20:17:52     1.1
>   +++ IntegrityLog.java       2001/08/09 01:18:28     1.2
>   @@ -6,150 +6,101 @@
>     */
>    package org.jbossmq.pm.rollinglogged;
>
>   -import java.io.RandomAccessFile;
>   -import java.io.OutputStream;
>   -import java.io.InputStream;
>   -import java.io.IOException;
>   -import java.io.File;
>   +import java.io.*;
>
>   -
>   +import org.jbossmq.*;
>    /**
>     * This class is used to create a log file which which will will 
>garantee
>   - * it's integrety up to the last commit point.
>   - *
>   - * The InputStream returned by getInputStream() will read
>   - * data placed into the log with the OutputStream returned by
>   - * getOutputStream().  The EOF for the InputStream is the
>   - * last commited point of the OutputStream.
>   + * it's integrety up to the last commit point.  An optimised version of 
>the
>   + * integrityLog in the logged persistence.
>     *
>   - * @author: Hiram Chirino ([EMAIL PROTECTED])
>   - * @version $Revision: 1.1 $
>   + * @author: David Maplesden ([EMAIL PROTECTED])
>   + * @version $Revision: 1.2 $
>     */
>   -public class IntegrityLog {
>   +public class IntegrityLog{
>
>       /////////////////////////////////////////////////////////////////////
>       // Attributes
>       /////////////////////////////////////////////////////////////////////
>   -   private static final int HEADER_SIZE=16; // in bytes
>   -
>   -   // Header related stuff
>   -   private long firstRecordPos;
>   -   private long nextRecordPos;
>   -   private byte headerBytes[]=new byte[HEADER_SIZE];
>
>       private RandomAccessFile raf;
>      private File f;
>   -
>   -   private LogOutputStream outputStream;
>   -   private LogInputStream inputStream;
>   -
>   -
>   -   /////////////////////////////////////////////////////////////////////
>   -   // Helper Inner Classes
>   -   /////////////////////////////////////////////////////////////////////
>   -   class LogInputStream extends InputStream {
>   -
>   -           boolean closed = false;
>   -           long inputPos = 0;
>   +  private ObjectOutput objectOutput;
>
>   -           public long getFilePointer() {
>   -                   return inputPos;
>   +  ////////////////////////////////////////////////////////////
>   +  //  Helper Inner classes.                                 //
>   +  ////////////////////////////////////////////////////////////
>   +
>   +  class MyOutputStream extends OutputStream{
>   +    public void close() throws IOException{
>   +      flush();
>   +    }
>   +           public void write(int b) throws IOException {
>   +                   raf.write( (byte)b );
>               }
>   -           public void close() throws IOException {
>   -                   super.close();
>   -                   closed = true;
>   +           public void write(byte bytes[], int off, int len) throws IOException 
>{
>   +                   raf.write( bytes, off, len );
>               }
>   -           public int read() throws IOException {
>   -                   inputPos = Math.max(inputPos, firstRecordPos);
>   -                   int rc = IntegrityLog.this.read(inputPos);
>   -                   if( rc >= 0 )
>   -                           inputPos ++;
>   -                   return rc;
>   +   }
>   +
>   +   class MyObjectOutputStream extends ObjectOutputStream {
>   +           MyObjectOutputStream(OutputStream os) throws IOException {
>   +                   super(os);
>               }
>   -           public int read(byte bytes[], int off, int len) throws IOException {
>   -                   inputPos = Math.max(inputPos, firstRecordPos);
>   -                   int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
>   -                   if( rc >= 0 )
>   -                           inputPos += rc;
>   -                   return rc;
>   +           protected void writeStreamHeader() {
>               }
>       }
>
>   -   class LogOutputStream extends OutputStream {
>   -           boolean closed = false;
>   -           public long getFilePointer() {
>   -                   return nextRecordPos;
>   -           }
>   -           public void close() throws IOException {
>   -                   super.close();
>   -                   closed = true;
>   +   class MyObjectInputStream extends ObjectInputStream {
>   +           MyObjectInputStream(InputStream is) throws IOException {
>   +                   super(is);
>               }
>   -           public void write(int b) throws IOException {
>   -                   IntegrityLog.this.write( (byte)b );
>   +           protected void readStreamHeader() {
>               }
>   -           public void write(byte bytes[], int off, int len) throws IOException 
>{
>   -                   IntegrityLog.this.write( bytes, off, len );
>   +   }
>   +
>   +  class MyInputStream extends InputStream{
>   +    public void close() throws IOException{
>   +    }
>   +           public int read() throws IOException {
>   +                   return raf.read( );
>               }
>   +           public int read(byte bytes[], int off, int len) throws IOException {
>   +                   return raf.read( bytes, off, len );
>   +           }
>   +  }
>   +
>   +   class MessageAddedRecord implements Serializable {
>   +           long messageId;
>   +           boolean isTransacted;
>   +           long transactionId;
>   +           SpyMessage message;
>   +           private final static long serialVersionUID = 235726945332013954L;
>       }
>
>   +   class MessageRemovedRecord implements Serializable {
>   +           boolean isTransacted;
>   +           long transactionId;
>   +           long messageId;
>   +           private final static long serialVersionUID = 235726945332013955L;
>   +   }
>
>       /////////////////////////////////////////////////////////////////////
>       // Constructor
>       /////////////////////////////////////////////////////////////////////
>       public IntegrityLog(String fileName) throws IOException {
>               f = new File(fileName);
>   -           boolean exists = f.isFile();
>   -
>               raf = new RandomAccessFile(f, "rw");
>   -           if( exists ) {
>   -                   loadHeader();
>   -           } else {
>   -                   initHeader();
>   -           }
>   +    this.objectOutput = new MyObjectOutputStream(new MyOutputStream());
>   +           seekEnd();
>       }
>
>       /////////////////////////////////////////////////////////////////////
>       // Public Methods
>       /////////////////////////////////////////////////////////////////////
>   -   public LogInputStream getInputStream() {
>   -           if ( inputStream==null || inputStream.closed ) {
>   -                   inputStream = new LogInputStream();
>   -           }
>   -           return inputStream;
>   -   }
>   -
>   -   public LogOutputStream getOutputStream() throws IOException {
>   -           if ( outputStream==null || outputStream.closed ) {
>   -                   outputStream = new LogOutputStream();
>   -           }
>   -           return outputStream;
>   -   }
>
>       public void commit() throws IOException {
>   -
>   -           headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
>   -           headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
>   -           headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
>   -           headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
>   -           headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
>   -           headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
>   -           headerBytes[6] = (byte)((firstRecordPos >>>  8) & 0xFF);
>   -           headerBytes[7] = (byte)((firstRecordPos >>>  0) & 0xFF);
>   -           headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
>   -           headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
>   -           headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
>   -           headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
>   -           headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
>   -           headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
>   -           headerBytes[14] =(byte)((nextRecordPos >>>  8) & 0xFF);
>   -           headerBytes[15] =(byte)((nextRecordPos >>>  0) & 0xFF);
>   -
>   -           raf.seek(0);
>   -           raf.write(headerBytes);
>   -   }
>   -
>   -   public void rollback() throws IOException {
>   -           loadHeader();
>   +    //raf.getFD().sync();
>       }
>
>      public void delete() throws IOException {
>   @@ -160,79 +111,132 @@
>               raf.close();
>               raf = null;
>       }
>   -
>   -   /////////////////////////////////////////////////////////////////////
>   -   // Private Methods
>   -   /////////////////////////////////////////////////////////////////////
>   -   private long getBytesLeft(long offset) {
>   -
>   -           return nextRecordPos-offset;
>   -
>   -   }
>   -
>   -   private void initHeader() throws IOException {
>   -
>   -           firstRecordPos = HEADER_SIZE;
>   -           nextRecordPos = HEADER_SIZE;
>   -
>   -           commit();
>   -   }
>   -
>   -   private void loadHeader() throws IOException {
>
>   -           raf.seek(0);
>   -           firstRecordPos = raf.readLong();
>   -           nextRecordPos = raf.readLong();
>   -
>   -   }
>   -
>   -   private int read(long offset) throws IOException {
>   -
>   -           if( offset >= nextRecordPos )
>   -                   return -1;
>   -
>   -           if( raf.getFilePointer() != offset ) {
>   -                   raf.seek(offset);
>   -           }
>   -
>   -           int rc = raf.read();
>   -           return rc;
>   -
>   -   }
>   -
>   -   private int read(long offset, byte bytes[], int off, int len) throws 
>IOException {
>   -
>   -           if( offset >= nextRecordPos )
>   -                   return -1;
>   +  protected static final byte TX = 0;
>   +  protected static final byte ADD = 1;
>   +  protected static final byte REMOVE = 2;
>   +
>   +  public synchronized void addTx(org.jbossmq.pm.Tx tx) throws 
>IOException{
>   +    raf.writeByte(TX);
>   +    raf.writeLong(tx.longValue());
>   +  }
>
>   -           len = (int)Math.min(len, getBytesLeft(offset));
>   +  public synchronized void add(long messageID, boolean isTransacted, 
>long txId, SpyMessage message)throws IOException{
>   +    raf.writeByte(ADD);
>   +    raf.writeLong(messageID);
>   +    raf.writeBoolean(isTransacted);
>   +    raf.writeLong(txId);
>   +    SpyMessage.writeMessage(message,objectOutput);
>   +    objectOutput.flush();
>   +  }
>
>   -           if( raf.getFilePointer() != offset ) {
>   -                   raf.seek(offset);
>   -           }
>   +  public synchronized void remove(long messageID, boolean isTransacted, 
>long txId)throws IOException{
>   +    raf.writeByte(REMOVE);
>   +    raf.writeLong(messageID);
>   +    raf.writeBoolean(isTransacted);
>   +    raf.writeLong(txId);
>   +  }
>
>   -           int rc = raf.read(bytes, off, len);
>   -           return rc;
>   +  public void skipNextEntry(ObjectInput in) throws IOException{
>   +    byte type = raf.readByte();
>   +    switch(type){
>   +      case TX:
>   +        raf.readLong();
>   +        return;
>   +      case ADD:
>   +        raf.readLong();
>   +        raf.readBoolean();
>   +        raf.readLong();
>   +        SpyMessage.readMessage(in);
>   +        return;
>   +      case REMOVE:
>   +        raf.readLong();
>   +        raf.readBoolean();
>   +        raf.readLong();
>   +        return;
>   +      default:
>   +        throw new java.io.IOException("Error in log file format.");
>   +    }
>   +  }
>
>   -   }
>   +  public java.util.LinkedList toIndex() throws IOException{
>   +    raf.seek(0);
>   +    long length = raf.length();
>   +    long pos = 0;
>   +    ObjectInput in = new MyObjectInputStream(new MyInputStream());
>   +    java.util.LinkedList ll = new java.util.LinkedList();
>   +    try{
>   +      while(pos < length){
>   +        ll.add(readNextEntry(in));
>   +        pos = raf.getFilePointer();
>   +      }
>   +    }catch(EOFException e){
>   +      //incomplete record
>   +    }
>   +    in.close();
>   +    raf.seek(pos);
>
>   -   private void write(byte []record, int off, int len) throws IOException 
>{
>   -           if( raf.getFilePointer() != nextRecordPos ) {
>   -                   raf.seek(nextRecordPos);
>   -           }
>   +    return ll;
>   +  }
>
>   -           raf.write(record, off, len);
>   -           nextRecordPos+=len;
>   -   }
>   +  public java.util.TreeSet toTreeSet() throws IOException{
>   +    raf.seek(0);
>   +    long length = raf.length();
>   +    long pos = 0;
>   +    ObjectInput in = new MyObjectInputStream(new MyInputStream());
>   +    java.util.TreeSet ll = new java.util.TreeSet();
>   +    try{
>   +      while(pos < length){
>   +        ll.add(readNextEntry(in));
>   +        pos = raf.getFilePointer();
>   +      }
>   +    }catch(EOFException e){
>   +      //incomplete record
>   +    }
>   +    in.close();
>   +    raf.seek(pos);
>
>   -   private void write(byte b) throws IOException {
>   +    return ll;
>   +  }
>
>   -           if( raf.getFilePointer() != nextRecordPos ) {
>   -                   raf.seek(nextRecordPos);
>   -           }
>   +  public Object readNextEntry(ObjectInput in) throws IOException{
>   +    byte type = raf.readByte();
>   +    switch(type){
>   +      case TX:
>   +        return new org.jbossmq.pm.Tx(raf.readLong());
>   +      case ADD:
>   +        MessageAddedRecord add = new MessageAddedRecord();
>   +        add.messageId = raf.readLong();
>   +        add.isTransacted = raf.readBoolean();
>   +        add.transactionId = raf.readLong();
>   +        add.message = SpyMessage.readMessage(in);
>   +        return add;
>   +      case REMOVE:
>   +        MessageRemovedRecord remove = new MessageRemovedRecord();
>   +        remove.messageId = raf.readLong();
>   +        remove.isTransacted = raf.readBoolean();
>   +        remove.transactionId = raf.readLong();
>   +        return remove;
>   +      default:
>   +        throw new java.io.IOException("Error in log file format.");
>   +    }
>   +  }
>
>   -           raf.write(b);
>   -           nextRecordPos++;
>   +   private void seekEnd() throws IOException {
>   +    raf.seek(0);
>   +    long length = raf.length();
>   +    long pos = 0;
>   +    ObjectInput in = new MyObjectInputStream(new MyInputStream());
>   +    try{
>   +      while(pos < length){
>   +        skipNextEntry(in);
>   +        pos = raf.getFilePointer();
>   +      }
>   +    }catch(EOFException e){
>   +      //incomplete record, must have been due to program failure during 
>write.
>   +    }
>   +    in.close();
>   +    raf.seek(pos);
>       }
>
>    }
>
>
>
>   1.3       +68 -19    
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java
>
>   Index: PersistenceManager.java
>   ===================================================================
>   RCS file: 
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java,v
>   retrieving revision 1.2
>   retrieving revision 1.3
>   diff -u -r1.2 -r1.3
>   --- PersistenceManager.java 2001/08/01 20:22:35     1.2
>   +++ PersistenceManager.java 2001/08/09 01:18:28     1.3
>   @@ -14,8 +14,8 @@
>    import org.jbossmq.server.JMSDestination;
>    import org.jbossmq.SpyMessage;
>    import org.jbossmq.SpyDestination;
>   +import org.jbossmq.SpyJMSException;
>
>   -
>    import javax.naming.InitialContext;
>    import org.jbossmq.pm.TxManager;
>    import org.jboss.util.ServiceMBeanSupport;
>   @@ -25,9 +25,9 @@
>    /**
>     * This class manages all persistence related services.
>     *
>   - * @author David Maplesden
>   + * @author David Maplesden ([EMAIL PROTECTED])
>     *
>   - * @version $Revision: 1.2 $
>   + * @version $Revision: 1.3 $
>     */
>    public class PersistenceManager
>       extends ServiceMBeanSupport
>   @@ -71,28 +71,69 @@
>       }
>
>      static class TxInfo {
>   -    Long txId;
>   +    org.jbossmq.pm.Tx txId;
>        LinkedList addMessages = new LinkedList();
>        LinkedList ackMessages = new LinkedList();
>        SpyTxLog log;
>   -    TxInfo(Long txId,SpyTxLog log){
>   +    TxInfo(org.jbossmq.pm.Tx txId,SpyTxLog log){
>          this.txId = txId;
>          this.log = log;
>        }
>      }
>   +
>   +  protected java.util.ArrayList listPool = new java.util.ArrayList();
>   +  protected java.util.ArrayList txPool = new java.util.ArrayList();
>   +
>   +  protected static int MAX_POOL_SIZE = 50;
>   +
>   +  protected TxInfo getTxInfo(org.jbossmq.pm.Tx txId, SpyTxLog txLog){
>   +    if(listPool.isEmpty()){
>   +      return new TxInfo(txId,txLog);
>   +    }else{
>   +      TxInfo info = (TxInfo)listPool.remove(listPool.size()-1);
>   +      info.txId = txId;
>   +      info.log = txLog;
>   +      return info;
>   +    }
>   +  }
>   +
>   +  protected void releaseTxInfo(TxInfo list){
>   +    if(listPool.size() < MAX_POOL_SIZE){
>   +      list.ackMessages.clear();
>   +      list.addMessages.clear();
>   +      listPool.add(list);
>   +    }
>   +  }
>
>   -   public Long createPersistentTx() throws javax.jms.JMSException {
>   -    Long txId = null;
>   +  protected org.jbossmq.pm.Tx getTx(long value){
>   +    if(txPool.isEmpty()){
>   +      return new org.jbossmq.pm.Tx(value);
>   +    }else{
>   +      org.jbossmq.pm.Tx tx = 
>(org.jbossmq.pm.Tx)txPool.remove(listPool.size()-1);
>   +      tx.setValue(value);
>   +      return tx;
>   +    }
>   +  }
>   +
>   +  protected void releaseTx(org.jbossmq.pm.Tx tx){
>   +    if(txPool.size() < MAX_POOL_SIZE){
>   +      txPool.add(tx);
>   +    }
>   +  }
>   +
>   +   public org.jbossmq.pm.Tx createPersistentTx() throws 
>javax.jms.JMSException {
>   +    org.jbossmq.pm.Tx txId = null;
>        SpyTxLog txLog = currentTxLog;
>        synchronized(transToTxLogs){
>   -      txId = new Long(++nextTxId);
>   -      transToTxLogs.put(txId,new TxInfo(txId,txLog));
>   +      txId = getTx(++nextTxId);
>   +      transToTxLogs.put(txId,getTxInfo(txId,txLog));
>        }
>        txLog.createTx();
>        return txId;
>       }
>
>   -   public void commitPersistentTx(Long txId) throws 
>javax.jms.JMSException {
>   +   public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws 
>javax.jms.JMSException {
>   +//System.out.println("Committing TX 
>"+Long.toHexString(txId.longValue()));
>        TxInfo info = null;
>        LinkedList messagesToDelete = null;
>        synchronized(transToTxLogs){
>   @@ -101,10 +142,14 @@
>        }
>        deleteMessages(messagesToDelete);
>               info.log.commitTx(txId);
>   +    synchronized(transToTxLogs){
>   +      releaseTx(txId);
>   +      releaseTxInfo(info);
>   +    }
>        checkCleanup(info.log);
>       }
>
>   -   public void rollbackPersistentTx(Long txId) throws 
>javax.jms.JMSException {
>   +   public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws 
>javax.jms.JMSException {
>        TxInfo info = null;
>        LinkedList messagesToDelete = null;
>        synchronized(transToTxLogs){
>   @@ -113,6 +158,10 @@
>        }
>        deleteMessages(messagesToDelete);
>               info.log.rollbackTx(txId);
>   +    synchronized(transToTxLogs){
>   +      releaseTx(txId);
>   +      releaseTxInfo(info);
>   +    }
>        checkCleanup(info.log);
>       }
>
>   @@ -160,7 +209,7 @@
>          }
>          checkCleanup(oldLog);
>        }catch(java.net.MalformedURLException e){
>   -      JMSException jme = new JMSException("Error rolling over logs to 
>new files.");
>   +      JMSException jme = new SpyJMSException("Error rolling over logs 
>to new files.");
>          jme.setLinkedException(e);
>          throw jme;
>        }
>   @@ -224,7 +273,7 @@
>            log = (SpyMessageLog)logs.remove(key);
>          }
>          if( log == null )
>   -        throw new JMSException("The persistence log was never 
>initialized");
>   +        throw new SpyJMSException("The persistence log was never 
>initialized");
>          log.close();
>          log.delete();
>
>   @@ -257,8 +306,8 @@
>
>       }
>
>   -   public void add(org.jbossmq.SpyMessage message, Long txId) throws 
>javax.jms.JMSException {
>   -
>   +   public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx 
>txId) throws javax.jms.JMSException {
>   +//System.out.println("Add message 
>"+Long.toHexString(message.messageId)+" in trans 
>"+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination());
>               LogInfo logInfo;
>
>        SpyTxLog txLog = null;
>   @@ -275,7 +324,7 @@
>          logs = (HashMap)messageLogs.get(txLog);
>               }
>        synchronized(logs){
>   -                   logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
>   +                   logInfo = (LogInfo) 
>logs.get(message.getJMSDestination().toString());
>               }
>
>               if (logInfo == null)
>   @@ -295,14 +344,15 @@
>        checkRollOver();
>       }
>
>   -   public void remove(org.jbossmq.SpyMessage message, Long txId) throws 
>javax.jms.JMSException {
>   +   public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx 
>txId) throws javax.jms.JMSException {
>   +//System.out.println("Removing message 
>"+Long.toHexString(message.messageId)+" in trans 
>"+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination());
>
>               LogInfo logInfo;
>
>        SpyTxLog txLog = ((LogInfo)message.persistData).txLog;
>               synchronized (messageLogs) {
>          HashMap logs = (HashMap)messageLogs.get(txLog);
>   -                   logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
>   +                   logInfo = (LogInfo) 
>logs.get(message.getJMSDestination().toString());
>               }
>
>               if (logInfo == null)
>   @@ -354,7 +404,6 @@
>        if(DEBUG) System.out.println("Using new rolling logged persistence 
>manager.");
>
>               URL configFile = 
>getClass().getClassLoader().getResource("jboss.jcml");
>   -
>               dataDirURL = new URL(configFile, dataDirectory);
>
>           //Get an InitialContext
>   @@ -386,7 +435,7 @@
>        }
>
>        if(!commitedTxs.isEmpty())
>   -      nextTxId = ((Long)commitedTxs.last()).longValue();
>   +      nextTxId = ((org.jbossmq.pm.Tx)commitedTxs.last()).longValue();
>
>        for(int i=0;i<dataFiles.length;++i){
>          String name = dataFiles[i].getName();
>
>
>
>   1.2       +0 -0      
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java
>
>   Index: PersistenceManagerMBean.java
>   ===================================================================
>   RCS file: 
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java,v
>   retrieving revision 1.1
>   retrieving revision 1.2
>   diff -u -r1.1 -r1.2
>   --- PersistenceManagerMBean.java    2001/07/31 20:17:52     1.1
>   +++ PersistenceManagerMBean.java    2001/08/09 01:18:28     1.2
>   @@ -13,7 +13,7 @@
>     *
>     *   @see <related>
>     *   @author Vincent Sheffer ([EMAIL PROTECTED])
>   - *   @version $Revision: 1.1 $
>   + *   @version $Revision: 1.2 $
>     */
>    public interface PersistenceManagerMBean
>       extends org.jboss.util.ServiceMBean
>
>
>
>   1.2       +23 -50    
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java
>
>   Index: SpyMessageLog.java
>   ===================================================================
>   RCS file: 
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java,v
>   retrieving revision 1.1
>   retrieving revision 1.2
>   diff -u -r1.1 -r1.2
>   --- SpyMessageLog.java      2001/07/31 20:17:52     1.1
>   +++ SpyMessageLog.java      2001/08/09 01:18:28     1.2
>   @@ -6,6 +6,8 @@
>     */
>    package org.jbossmq.pm.rollinglogged;
>
>   +import org.jbossmq.SpyJMSException;
>   +
>    import java.io.IOException;
>    import java.io.Serializable;
>    import javax.jms.JMSException;
>   @@ -18,42 +20,21 @@
>     * provider failure.  Integrety is kept by the use of an 
>ObjectIntegrityLog.
>     *
>     * @author: Hiram Chirino ([EMAIL PROTECTED])
>   - * @version $Revision: 1.1 $
>   + * @version $Revision: 1.2 $
>     */
>    public class SpyMessageLog {
>
>       /////////////////////////////////////////////////////////////////////
>       // Attributes
>   -   /////////////////////////////////////////////////////////////////////
>   -   private ObjectIntegrityLog transactionLog;
>   -   private MessageAddedRecord messageAddedRecord = new 
>MessageAddedRecord();
>   -   private MessageRemovedRecord messageRemovedRecord = new 
>MessageRemovedRecord();
>   -
>       /////////////////////////////////////////////////////////////////////
>   -   // Helper Inner Classes
>   -   /////////////////////////////////////////////////////////////////////
>   -   static class MessageAddedRecord implements Serializable {
>   -           long messageId;
>   -           boolean isTransacted;
>   -           long transactionId;
>   -           SpyMessage message;
>   -           private final static long serialVersionUID = 235726945332013954L;
>   -   }
>   +   private IntegrityLog transactionLog;
>
>   -   static class MessageRemovedRecord implements Serializable {
>   -           boolean isTransacted;
>   -           long transactionId;
>   -           long messageId;
>   -           private final static long serialVersionUID = 235726945332013955L;
>   -   }
>   -
>   -
>       /////////////////////////////////////////////////////////////////////
>       // Constructor
>       /////////////////////////////////////////////////////////////////////
>       public SpyMessageLog(String fileName) throws JMSException {
>               try {
>   -                   transactionLog = new ObjectIntegrityLog(fileName);
>   +                   transactionLog = new IntegrityLog(fileName);
>               } catch ( IOException e ) {
>                       throwJMSException("Could not open the queue's tranaction log: 
>"+fileName,e);
>               }
>   @@ -79,19 +60,14 @@
>               }
>      }
>
>   -   synchronized public void add( SpyMessage message, Long transactionId ) 
>throws JMSException {
>   +   synchronized public void add( SpyMessage message, org.jbossmq.pm.Tx 
>transactionId ) throws JMSException {
>               try{
>
>   -                   messageAddedRecord.message = message;
>   -                   messageAddedRecord.messageId = message.messageId;
>                       if( transactionId == null )     {
>   -                           messageAddedRecord.isTransacted = false;
>   +        transactionLog.add(message.messageId,false,-1,message);
>                       } else {
>   -                           messageAddedRecord.isTransacted = true;
>   -                           messageAddedRecord.transactionId = 
>transactionId.longValue();
>   +        
>transactionLog.add(message.messageId,true,transactionId.longValue(),message);
>                       }
>   -
>   -                   transactionLog.add(messageAddedRecord);
>                       transactionLog.commit();
>
>               } catch ( IOException e ) {
>   @@ -100,17 +76,14 @@
>
>       }
>
>   -   synchronized public void remove( SpyMessage message, Long 
>transactionId ) throws JMSException {
>   +   synchronized public void remove( SpyMessage message, org.jbossmq.pm.Tx 
>transactionId ) throws JMSException {
>               try{
>
>   -                   messageRemovedRecord.messageId = message.messageId;
>                       if( transactionId == null ) {
>   -                           messageRemovedRecord.isTransacted = false;
>   +        transactionLog.remove(message.messageId,false,-1);
>                       } else {
>   -                           messageRemovedRecord.isTransacted = true;
>   -                           messageRemovedRecord.transactionId = 
>transactionId.longValue();
>   +        
>transactionLog.remove(message.messageId,true,transactionId.longValue());
>                       }
>   -                   transactionLog.add(messageRemovedRecord);
>                       transactionLog.commit();
>
>               } catch ( IOException e ) {
>   @@ -124,29 +97,29 @@
>               java.util.HashMap messageIndex = new java.util.HashMap();
>
>               try {
>   -                   ObjectIntegrityLog.IndexItem objects[] = 
>transactionLog.toIndex();
>   +                   java.util.LinkedList objects = transactionLog.toIndex();
>
>   -                   for( int i=0; i < objects.length; i++ ) {
>   +                   for(java.util.Iterator it = objects.iterator();it.hasNext(); ) 
>{
>
>   -                           Object o = objects[i].record;
>   -                           if( o instanceof MessageAddedRecord ) {
>   +                           Object o = it.next();
>   +                           if( o instanceof IntegrityLog.MessageAddedRecord ) {
>
>   -                                   MessageAddedRecord r = (MessageAddedRecord)o;
>   +                                   IntegrityLog.MessageAddedRecord r = 
>(IntegrityLog.MessageAddedRecord)o;
>                                       r.message.messageId = r.messageId;
>
>   -                                   if( r.isTransacted && !commited.contains(new 
>Long(r.transactionId)) ) {
>   +                                   if( r.isTransacted && !commited.contains(new 
>org.jbossmq.pm.Tx(r.transactionId)) ) {
>                                               // the TX this message was part of was 
>not
>                                               // commited... so drop this message
>                                               continue;
>                                       }
>
>   -                                   messageIndex.put( new Long(r.messageId), 
>objects[i]);
>   +                                   messageIndex.put( new Long(r.messageId), o);
>
>   -                           } else if( o instanceof MessageRemovedRecord ) {
>   +                           } else if( o instanceof 
>IntegrityLog.MessageRemovedRecord ) {
>
>   -                                   MessageRemovedRecord r = 
>(MessageRemovedRecord)o;
>   +                                   IntegrityLog.MessageRemovedRecord r = 
>(IntegrityLog.MessageRemovedRecord)o;
>
>   -                                   if( r.isTransacted && !commited.contains(new 
>Long(r.transactionId)) ) {
>   +                                   if( r.isTransacted && !commited.contains(new 
>org.jbossmq.pm.Tx(r.transactionId)) ) {
>                                               // the TX this message was part of was 
>not
>                                               // commited... so drop this message
>                                               continue;
>   @@ -158,20 +131,20 @@
>
>                       }
>               } catch ( Exception e ) {
>   +//      e.printStackTrace();
>                       throwJMSException("Could not rebuild the queue from the 
>queue's 
>tranaction log.",e);
>               }
>
>               SpyMessage rc[] = new SpyMessage[messageIndex.size()];
>               java.util.Iterator iter = messageIndex.values().iterator();
>               for( int i=0; iter.hasNext(); i++ ) {
>   -                   ObjectIntegrityLog.IndexItem item = 
>(ObjectIntegrityLog.IndexItem)iter.next();
>   -                   rc[i] = ((MessageAddedRecord)item.record).message;
>   +                   rc[i] = ((IntegrityLog.MessageAddedRecord)iter.next()).message;
>               }
>               return rc;
>       }
>
>       private void throwJMSException(String message, Exception e) throws 
>JMSException {
>   -           JMSException newE = new JMSException(message);
>   +           JMSException newE = new SpyJMSException(message);
>               newE.setLinkedException(e);
>               throw newE;
>       }
>
>
>
>   1.2       +8 -6      
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java
>
>   Index: SpyTxLog.java
>   ===================================================================
>   RCS file: 
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java,v
>   retrieving revision 1.1
>   retrieving revision 1.2
>   diff -u -r1.1 -r1.2
>   --- SpyTxLog.java   2001/07/31 20:17:52     1.1
>   +++ SpyTxLog.java   2001/08/09 01:18:28     1.2
>   @@ -6,6 +6,8 @@
>     */
>    package org.jbossmq.pm.rollinglogged;
>
>   +import org.jbossmq.SpyJMSException;
>   +
>    import java.io.Serializable;
>    import java.io.IOException;
>
>   @@ -15,14 +17,14 @@
>     * This is used to keep a log of commited transactions.
>     *
>     * @author: Hiram Chirino ([EMAIL PROTECTED])
>   - * @version $Revision: 1.1 $
>   + * @version $Revision: 1.2 $
>     */
>    public class SpyTxLog {
>
>       /////////////////////////////////////////////////////////////////////
>       // Attributes
>       /////////////////////////////////////////////////////////////////////
>   -   private ObjectIntegrityLog transactionLog;
>   +   private IntegrityLog transactionLog;
>      private int liveTransactionCount = 0;
>      private Object counterLock = new Object();
>
>   @@ -31,7 +33,7 @@
>       /////////////////////////////////////////////////////////////////////
>       public SpyTxLog(String fileName) throws JMSException {
>               try {
>   -                   transactionLog = new ObjectIntegrityLog(fileName);
>   +                   transactionLog = new IntegrityLog(fileName);
>               } catch (IOException e) {
>                       throwJMSException("Could not open the queue's tranaction log: 
>" + 
>fileName, e);
>               }
>   @@ -69,10 +71,10 @@
>        }
>       }
>
>   -   synchronized public void commitTx(Long id) throws JMSException {
>   +   synchronized public void commitTx(org.jbossmq.pm.Tx id) throws 
>JMSException {
>
>               try {
>   -                   transactionLog.add(id);
>   +                   transactionLog.addTx(id);
>                       transactionLog.commit();
>          synchronized(counterLock){
>            --liveTransactionCount;
>   @@ -91,7 +93,7 @@
>               }
>       }
>
>   -   public void rollbackTx(Long txId) throws JMSException {
>   +   public void rollbackTx(org.jbossmq.pm.Tx txId) throws JMSException {
>        synchronized(counterLock){
>          --liveTransactionCount;
>        }
>   @@ -101,7 +103,7 @@
>       // Private Methods
>       /////////////////////////////////////////////////////////////////////
>       private void throwJMSException(String message, Exception e) throws 
>JMSException {
>   -           JMSException newE = new JMSException(message);
>   +           JMSException newE = new SpyJMSException(message);
>               newE.setLinkedException(e);
>               throw newE;
>       }
>
>
>
>
>_______________________________________________
>Jboss-development mailing list
>[EMAIL PROTECTED]
>http://lists.sourceforge.net/lists/listinfo/jboss-development


_________________________________________________________________
Get your FREE download of MSN Explorer at http://explorer.msn.com/intl.asp


_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to