WOW!!! now this gets me exited! A rolling log file based persistence
manager! This should give use the performance edge of using a log files for
persistence but removes all the scalablity problems of a linear logger.
Good Work Paul!
Hiram
>From: Paul Kendall <[EMAIL PROTECTED]>
>Reply-To: [EMAIL PROTECTED]
>To: [EMAIL PROTECTED]
>Subject: [JBoss-dev] CVS update:
>jbossmq/src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java
>PersistenceManager.java PersistenceManagerMBean.java SpyMessageLog.java
>SpyTxLog.java ObjectIntegrityLog.java
>Date: Wed, 08 Aug 2001 18:18:28 -0700
>
> User: pkendall
> Date: 01/08/08 18:18:28
>
> Modified: src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java
> PersistenceManager.java
> PersistenceManagerMBean.java SpyMessageLog.java
> SpyTxLog.java
> Removed: src/main/org/jbossmq/pm/rollinglogged
> ObjectIntegrityLog.java
> Log:
> Major updates (especially to topics).
> Speed improvements.
> Make JVM IL work (by using a singleton JMSServer).
> Message Listeners re-implemented using client-side thread.
>
> Revision Changes Path
> 1.2 +173 -169
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java
>
> Index: IntegrityLog.java
> ===================================================================
> RCS file:
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java,v
> retrieving revision 1.1
> retrieving revision 1.2
> diff -u -r1.1 -r1.2
> --- IntegrityLog.java 2001/07/31 20:17:52 1.1
> +++ IntegrityLog.java 2001/08/09 01:18:28 1.2
> @@ -6,150 +6,101 @@
> */
> package org.jbossmq.pm.rollinglogged;
>
> -import java.io.RandomAccessFile;
> -import java.io.OutputStream;
> -import java.io.InputStream;
> -import java.io.IOException;
> -import java.io.File;
> +import java.io.*;
>
> -
> +import org.jbossmq.*;
> /**
> * This class is used to create a log file which which will will
>garantee
> - * it's integrety up to the last commit point.
> - *
> - * The InputStream returned by getInputStream() will read
> - * data placed into the log with the OutputStream returned by
> - * getOutputStream(). The EOF for the InputStream is the
> - * last commited point of the OutputStream.
> + * it's integrety up to the last commit point. An optimised version of
>the
> + * integrityLog in the logged persistence.
> *
> - * @author: Hiram Chirino ([EMAIL PROTECTED])
> - * @version $Revision: 1.1 $
> + * @author: David Maplesden ([EMAIL PROTECTED])
> + * @version $Revision: 1.2 $
> */
> -public class IntegrityLog {
> +public class IntegrityLog{
>
> /////////////////////////////////////////////////////////////////////
> // Attributes
> /////////////////////////////////////////////////////////////////////
> - private static final int HEADER_SIZE=16; // in bytes
> -
> - // Header related stuff
> - private long firstRecordPos;
> - private long nextRecordPos;
> - private byte headerBytes[]=new byte[HEADER_SIZE];
>
> private RandomAccessFile raf;
> private File f;
> -
> - private LogOutputStream outputStream;
> - private LogInputStream inputStream;
> -
> -
> - /////////////////////////////////////////////////////////////////////
> - // Helper Inner Classes
> - /////////////////////////////////////////////////////////////////////
> - class LogInputStream extends InputStream {
> -
> - boolean closed = false;
> - long inputPos = 0;
> + private ObjectOutput objectOutput;
>
> - public long getFilePointer() {
> - return inputPos;
> + ////////////////////////////////////////////////////////////
> + // Helper Inner classes. //
> + ////////////////////////////////////////////////////////////
> +
> + class MyOutputStream extends OutputStream{
> + public void close() throws IOException{
> + flush();
> + }
> + public void write(int b) throws IOException {
> + raf.write( (byte)b );
> }
> - public void close() throws IOException {
> - super.close();
> - closed = true;
> + public void write(byte bytes[], int off, int len) throws IOException
>{
> + raf.write( bytes, off, len );
> }
> - public int read() throws IOException {
> - inputPos = Math.max(inputPos, firstRecordPos);
> - int rc = IntegrityLog.this.read(inputPos);
> - if( rc >= 0 )
> - inputPos ++;
> - return rc;
> + }
> +
> + class MyObjectOutputStream extends ObjectOutputStream {
> + MyObjectOutputStream(OutputStream os) throws IOException {
> + super(os);
> }
> - public int read(byte bytes[], int off, int len) throws IOException {
> - inputPos = Math.max(inputPos, firstRecordPos);
> - int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
> - if( rc >= 0 )
> - inputPos += rc;
> - return rc;
> + protected void writeStreamHeader() {
> }
> }
>
> - class LogOutputStream extends OutputStream {
> - boolean closed = false;
> - public long getFilePointer() {
> - return nextRecordPos;
> - }
> - public void close() throws IOException {
> - super.close();
> - closed = true;
> + class MyObjectInputStream extends ObjectInputStream {
> + MyObjectInputStream(InputStream is) throws IOException {
> + super(is);
> }
> - public void write(int b) throws IOException {
> - IntegrityLog.this.write( (byte)b );
> + protected void readStreamHeader() {
> }
> - public void write(byte bytes[], int off, int len) throws IOException
>{
> - IntegrityLog.this.write( bytes, off, len );
> + }
> +
> + class MyInputStream extends InputStream{
> + public void close() throws IOException{
> + }
> + public int read() throws IOException {
> + return raf.read( );
> }
> + public int read(byte bytes[], int off, int len) throws IOException {
> + return raf.read( bytes, off, len );
> + }
> + }
> +
> + class MessageAddedRecord implements Serializable {
> + long messageId;
> + boolean isTransacted;
> + long transactionId;
> + SpyMessage message;
> + private final static long serialVersionUID = 235726945332013954L;
> }
>
> + class MessageRemovedRecord implements Serializable {
> + boolean isTransacted;
> + long transactionId;
> + long messageId;
> + private final static long serialVersionUID = 235726945332013955L;
> + }
>
> /////////////////////////////////////////////////////////////////////
> // Constructor
> /////////////////////////////////////////////////////////////////////
> public IntegrityLog(String fileName) throws IOException {
> f = new File(fileName);
> - boolean exists = f.isFile();
> -
> raf = new RandomAccessFile(f, "rw");
> - if( exists ) {
> - loadHeader();
> - } else {
> - initHeader();
> - }
> + this.objectOutput = new MyObjectOutputStream(new MyOutputStream());
> + seekEnd();
> }
>
> /////////////////////////////////////////////////////////////////////
> // Public Methods
> /////////////////////////////////////////////////////////////////////
> - public LogInputStream getInputStream() {
> - if ( inputStream==null || inputStream.closed ) {
> - inputStream = new LogInputStream();
> - }
> - return inputStream;
> - }
> -
> - public LogOutputStream getOutputStream() throws IOException {
> - if ( outputStream==null || outputStream.closed ) {
> - outputStream = new LogOutputStream();
> - }
> - return outputStream;
> - }
>
> public void commit() throws IOException {
> -
> - headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
> - headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
> - headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
> - headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
> - headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
> - headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
> - headerBytes[6] = (byte)((firstRecordPos >>> 8) & 0xFF);
> - headerBytes[7] = (byte)((firstRecordPos >>> 0) & 0xFF);
> - headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
> - headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
> - headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
> - headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
> - headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
> - headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
> - headerBytes[14] =(byte)((nextRecordPos >>> 8) & 0xFF);
> - headerBytes[15] =(byte)((nextRecordPos >>> 0) & 0xFF);
> -
> - raf.seek(0);
> - raf.write(headerBytes);
> - }
> -
> - public void rollback() throws IOException {
> - loadHeader();
> + //raf.getFD().sync();
> }
>
> public void delete() throws IOException {
> @@ -160,79 +111,132 @@
> raf.close();
> raf = null;
> }
> -
> - /////////////////////////////////////////////////////////////////////
> - // Private Methods
> - /////////////////////////////////////////////////////////////////////
> - private long getBytesLeft(long offset) {
> -
> - return nextRecordPos-offset;
> -
> - }
> -
> - private void initHeader() throws IOException {
> -
> - firstRecordPos = HEADER_SIZE;
> - nextRecordPos = HEADER_SIZE;
> -
> - commit();
> - }
> -
> - private void loadHeader() throws IOException {
>
> - raf.seek(0);
> - firstRecordPos = raf.readLong();
> - nextRecordPos = raf.readLong();
> -
> - }
> -
> - private int read(long offset) throws IOException {
> -
> - if( offset >= nextRecordPos )
> - return -1;
> -
> - if( raf.getFilePointer() != offset ) {
> - raf.seek(offset);
> - }
> -
> - int rc = raf.read();
> - return rc;
> -
> - }
> -
> - private int read(long offset, byte bytes[], int off, int len) throws
>IOException {
> -
> - if( offset >= nextRecordPos )
> - return -1;
> + protected static final byte TX = 0;
> + protected static final byte ADD = 1;
> + protected static final byte REMOVE = 2;
> +
> + public synchronized void addTx(org.jbossmq.pm.Tx tx) throws
>IOException{
> + raf.writeByte(TX);
> + raf.writeLong(tx.longValue());
> + }
>
> - len = (int)Math.min(len, getBytesLeft(offset));
> + public synchronized void add(long messageID, boolean isTransacted,
>long txId, SpyMessage message)throws IOException{
> + raf.writeByte(ADD);
> + raf.writeLong(messageID);
> + raf.writeBoolean(isTransacted);
> + raf.writeLong(txId);
> + SpyMessage.writeMessage(message,objectOutput);
> + objectOutput.flush();
> + }
>
> - if( raf.getFilePointer() != offset ) {
> - raf.seek(offset);
> - }
> + public synchronized void remove(long messageID, boolean isTransacted,
>long txId)throws IOException{
> + raf.writeByte(REMOVE);
> + raf.writeLong(messageID);
> + raf.writeBoolean(isTransacted);
> + raf.writeLong(txId);
> + }
>
> - int rc = raf.read(bytes, off, len);
> - return rc;
> + public void skipNextEntry(ObjectInput in) throws IOException{
> + byte type = raf.readByte();
> + switch(type){
> + case TX:
> + raf.readLong();
> + return;
> + case ADD:
> + raf.readLong();
> + raf.readBoolean();
> + raf.readLong();
> + SpyMessage.readMessage(in);
> + return;
> + case REMOVE:
> + raf.readLong();
> + raf.readBoolean();
> + raf.readLong();
> + return;
> + default:
> + throw new java.io.IOException("Error in log file format.");
> + }
> + }
>
> - }
> + public java.util.LinkedList toIndex() throws IOException{
> + raf.seek(0);
> + long length = raf.length();
> + long pos = 0;
> + ObjectInput in = new MyObjectInputStream(new MyInputStream());
> + java.util.LinkedList ll = new java.util.LinkedList();
> + try{
> + while(pos < length){
> + ll.add(readNextEntry(in));
> + pos = raf.getFilePointer();
> + }
> + }catch(EOFException e){
> + //incomplete record
> + }
> + in.close();
> + raf.seek(pos);
>
> - private void write(byte []record, int off, int len) throws IOException
>{
> - if( raf.getFilePointer() != nextRecordPos ) {
> - raf.seek(nextRecordPos);
> - }
> + return ll;
> + }
>
> - raf.write(record, off, len);
> - nextRecordPos+=len;
> - }
> + public java.util.TreeSet toTreeSet() throws IOException{
> + raf.seek(0);
> + long length = raf.length();
> + long pos = 0;
> + ObjectInput in = new MyObjectInputStream(new MyInputStream());
> + java.util.TreeSet ll = new java.util.TreeSet();
> + try{
> + while(pos < length){
> + ll.add(readNextEntry(in));
> + pos = raf.getFilePointer();
> + }
> + }catch(EOFException e){
> + //incomplete record
> + }
> + in.close();
> + raf.seek(pos);
>
> - private void write(byte b) throws IOException {
> + return ll;
> + }
>
> - if( raf.getFilePointer() != nextRecordPos ) {
> - raf.seek(nextRecordPos);
> - }
> + public Object readNextEntry(ObjectInput in) throws IOException{
> + byte type = raf.readByte();
> + switch(type){
> + case TX:
> + return new org.jbossmq.pm.Tx(raf.readLong());
> + case ADD:
> + MessageAddedRecord add = new MessageAddedRecord();
> + add.messageId = raf.readLong();
> + add.isTransacted = raf.readBoolean();
> + add.transactionId = raf.readLong();
> + add.message = SpyMessage.readMessage(in);
> + return add;
> + case REMOVE:
> + MessageRemovedRecord remove = new MessageRemovedRecord();
> + remove.messageId = raf.readLong();
> + remove.isTransacted = raf.readBoolean();
> + remove.transactionId = raf.readLong();
> + return remove;
> + default:
> + throw new java.io.IOException("Error in log file format.");
> + }
> + }
>
> - raf.write(b);
> - nextRecordPos++;
> + private void seekEnd() throws IOException {
> + raf.seek(0);
> + long length = raf.length();
> + long pos = 0;
> + ObjectInput in = new MyObjectInputStream(new MyInputStream());
> + try{
> + while(pos < length){
> + skipNextEntry(in);
> + pos = raf.getFilePointer();
> + }
> + }catch(EOFException e){
> + //incomplete record, must have been due to program failure during
>write.
> + }
> + in.close();
> + raf.seek(pos);
> }
>
> }
>
>
>
> 1.3 +68 -19
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java
>
> Index: PersistenceManager.java
> ===================================================================
> RCS file:
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java,v
> retrieving revision 1.2
> retrieving revision 1.3
> diff -u -r1.2 -r1.3
> --- PersistenceManager.java 2001/08/01 20:22:35 1.2
> +++ PersistenceManager.java 2001/08/09 01:18:28 1.3
> @@ -14,8 +14,8 @@
> import org.jbossmq.server.JMSDestination;
> import org.jbossmq.SpyMessage;
> import org.jbossmq.SpyDestination;
> +import org.jbossmq.SpyJMSException;
>
> -
> import javax.naming.InitialContext;
> import org.jbossmq.pm.TxManager;
> import org.jboss.util.ServiceMBeanSupport;
> @@ -25,9 +25,9 @@
> /**
> * This class manages all persistence related services.
> *
> - * @author David Maplesden
> + * @author David Maplesden ([EMAIL PROTECTED])
> *
> - * @version $Revision: 1.2 $
> + * @version $Revision: 1.3 $
> */
> public class PersistenceManager
> extends ServiceMBeanSupport
> @@ -71,28 +71,69 @@
> }
>
> static class TxInfo {
> - Long txId;
> + org.jbossmq.pm.Tx txId;
> LinkedList addMessages = new LinkedList();
> LinkedList ackMessages = new LinkedList();
> SpyTxLog log;
> - TxInfo(Long txId,SpyTxLog log){
> + TxInfo(org.jbossmq.pm.Tx txId,SpyTxLog log){
> this.txId = txId;
> this.log = log;
> }
> }
> +
> + protected java.util.ArrayList listPool = new java.util.ArrayList();
> + protected java.util.ArrayList txPool = new java.util.ArrayList();
> +
> + protected static int MAX_POOL_SIZE = 50;
> +
> + protected TxInfo getTxInfo(org.jbossmq.pm.Tx txId, SpyTxLog txLog){
> + if(listPool.isEmpty()){
> + return new TxInfo(txId,txLog);
> + }else{
> + TxInfo info = (TxInfo)listPool.remove(listPool.size()-1);
> + info.txId = txId;
> + info.log = txLog;
> + return info;
> + }
> + }
> +
> + protected void releaseTxInfo(TxInfo list){
> + if(listPool.size() < MAX_POOL_SIZE){
> + list.ackMessages.clear();
> + list.addMessages.clear();
> + listPool.add(list);
> + }
> + }
>
> - public Long createPersistentTx() throws javax.jms.JMSException {
> - Long txId = null;
> + protected org.jbossmq.pm.Tx getTx(long value){
> + if(txPool.isEmpty()){
> + return new org.jbossmq.pm.Tx(value);
> + }else{
> + org.jbossmq.pm.Tx tx =
>(org.jbossmq.pm.Tx)txPool.remove(listPool.size()-1);
> + tx.setValue(value);
> + return tx;
> + }
> + }
> +
> + protected void releaseTx(org.jbossmq.pm.Tx tx){
> + if(txPool.size() < MAX_POOL_SIZE){
> + txPool.add(tx);
> + }
> + }
> +
> + public org.jbossmq.pm.Tx createPersistentTx() throws
>javax.jms.JMSException {
> + org.jbossmq.pm.Tx txId = null;
> SpyTxLog txLog = currentTxLog;
> synchronized(transToTxLogs){
> - txId = new Long(++nextTxId);
> - transToTxLogs.put(txId,new TxInfo(txId,txLog));
> + txId = getTx(++nextTxId);
> + transToTxLogs.put(txId,getTxInfo(txId,txLog));
> }
> txLog.createTx();
> return txId;
> }
>
> - public void commitPersistentTx(Long txId) throws
>javax.jms.JMSException {
> + public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws
>javax.jms.JMSException {
> +//System.out.println("Committing TX
>"+Long.toHexString(txId.longValue()));
> TxInfo info = null;
> LinkedList messagesToDelete = null;
> synchronized(transToTxLogs){
> @@ -101,10 +142,14 @@
> }
> deleteMessages(messagesToDelete);
> info.log.commitTx(txId);
> + synchronized(transToTxLogs){
> + releaseTx(txId);
> + releaseTxInfo(info);
> + }
> checkCleanup(info.log);
> }
>
> - public void rollbackPersistentTx(Long txId) throws
>javax.jms.JMSException {
> + public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws
>javax.jms.JMSException {
> TxInfo info = null;
> LinkedList messagesToDelete = null;
> synchronized(transToTxLogs){
> @@ -113,6 +158,10 @@
> }
> deleteMessages(messagesToDelete);
> info.log.rollbackTx(txId);
> + synchronized(transToTxLogs){
> + releaseTx(txId);
> + releaseTxInfo(info);
> + }
> checkCleanup(info.log);
> }
>
> @@ -160,7 +209,7 @@
> }
> checkCleanup(oldLog);
> }catch(java.net.MalformedURLException e){
> - JMSException jme = new JMSException("Error rolling over logs to
>new files.");
> + JMSException jme = new SpyJMSException("Error rolling over logs
>to new files.");
> jme.setLinkedException(e);
> throw jme;
> }
> @@ -224,7 +273,7 @@
> log = (SpyMessageLog)logs.remove(key);
> }
> if( log == null )
> - throw new JMSException("The persistence log was never
>initialized");
> + throw new SpyJMSException("The persistence log was never
>initialized");
> log.close();
> log.delete();
>
> @@ -257,8 +306,8 @@
>
> }
>
> - public void add(org.jbossmq.SpyMessage message, Long txId) throws
>javax.jms.JMSException {
> -
> + public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx
>txId) throws javax.jms.JMSException {
> +//System.out.println("Add message
>"+Long.toHexString(message.messageId)+" in trans
>"+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination());
> LogInfo logInfo;
>
> SpyTxLog txLog = null;
> @@ -275,7 +324,7 @@
> logs = (HashMap)messageLogs.get(txLog);
> }
> synchronized(logs){
> - logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
> + logInfo = (LogInfo)
>logs.get(message.getJMSDestination().toString());
> }
>
> if (logInfo == null)
> @@ -295,14 +344,15 @@
> checkRollOver();
> }
>
> - public void remove(org.jbossmq.SpyMessage message, Long txId) throws
>javax.jms.JMSException {
> + public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx
>txId) throws javax.jms.JMSException {
> +//System.out.println("Removing message
>"+Long.toHexString(message.messageId)+" in trans
>"+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination());
>
> LogInfo logInfo;
>
> SpyTxLog txLog = ((LogInfo)message.persistData).txLog;
> synchronized (messageLogs) {
> HashMap logs = (HashMap)messageLogs.get(txLog);
> - logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
> + logInfo = (LogInfo)
>logs.get(message.getJMSDestination().toString());
> }
>
> if (logInfo == null)
> @@ -354,7 +404,6 @@
> if(DEBUG) System.out.println("Using new rolling logged persistence
>manager.");
>
> URL configFile =
>getClass().getClassLoader().getResource("jboss.jcml");
> -
> dataDirURL = new URL(configFile, dataDirectory);
>
> //Get an InitialContext
> @@ -386,7 +435,7 @@
> }
>
> if(!commitedTxs.isEmpty())
> - nextTxId = ((Long)commitedTxs.last()).longValue();
> + nextTxId = ((org.jbossmq.pm.Tx)commitedTxs.last()).longValue();
>
> for(int i=0;i<dataFiles.length;++i){
> String name = dataFiles[i].getName();
>
>
>
> 1.2 +0 -0
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java
>
> Index: PersistenceManagerMBean.java
> ===================================================================
> RCS file:
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java,v
> retrieving revision 1.1
> retrieving revision 1.2
> diff -u -r1.1 -r1.2
> --- PersistenceManagerMBean.java 2001/07/31 20:17:52 1.1
> +++ PersistenceManagerMBean.java 2001/08/09 01:18:28 1.2
> @@ -13,7 +13,7 @@
> *
> * @see <related>
> * @author Vincent Sheffer ([EMAIL PROTECTED])
> - * @version $Revision: 1.1 $
> + * @version $Revision: 1.2 $
> */
> public interface PersistenceManagerMBean
> extends org.jboss.util.ServiceMBean
>
>
>
> 1.2 +23 -50
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java
>
> Index: SpyMessageLog.java
> ===================================================================
> RCS file:
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java,v
> retrieving revision 1.1
> retrieving revision 1.2
> diff -u -r1.1 -r1.2
> --- SpyMessageLog.java 2001/07/31 20:17:52 1.1
> +++ SpyMessageLog.java 2001/08/09 01:18:28 1.2
> @@ -6,6 +6,8 @@
> */
> package org.jbossmq.pm.rollinglogged;
>
> +import org.jbossmq.SpyJMSException;
> +
> import java.io.IOException;
> import java.io.Serializable;
> import javax.jms.JMSException;
> @@ -18,42 +20,21 @@
> * provider failure. Integrety is kept by the use of an
>ObjectIntegrityLog.
> *
> * @author: Hiram Chirino ([EMAIL PROTECTED])
> - * @version $Revision: 1.1 $
> + * @version $Revision: 1.2 $
> */
> public class SpyMessageLog {
>
> /////////////////////////////////////////////////////////////////////
> // Attributes
> - /////////////////////////////////////////////////////////////////////
> - private ObjectIntegrityLog transactionLog;
> - private MessageAddedRecord messageAddedRecord = new
>MessageAddedRecord();
> - private MessageRemovedRecord messageRemovedRecord = new
>MessageRemovedRecord();
> -
> /////////////////////////////////////////////////////////////////////
> - // Helper Inner Classes
> - /////////////////////////////////////////////////////////////////////
> - static class MessageAddedRecord implements Serializable {
> - long messageId;
> - boolean isTransacted;
> - long transactionId;
> - SpyMessage message;
> - private final static long serialVersionUID = 235726945332013954L;
> - }
> + private IntegrityLog transactionLog;
>
> - static class MessageRemovedRecord implements Serializable {
> - boolean isTransacted;
> - long transactionId;
> - long messageId;
> - private final static long serialVersionUID = 235726945332013955L;
> - }
> -
> -
> /////////////////////////////////////////////////////////////////////
> // Constructor
> /////////////////////////////////////////////////////////////////////
> public SpyMessageLog(String fileName) throws JMSException {
> try {
> - transactionLog = new ObjectIntegrityLog(fileName);
> + transactionLog = new IntegrityLog(fileName);
> } catch ( IOException e ) {
> throwJMSException("Could not open the queue's tranaction log:
>"+fileName,e);
> }
> @@ -79,19 +60,14 @@
> }
> }
>
> - synchronized public void add( SpyMessage message, Long transactionId )
>throws JMSException {
> + synchronized public void add( SpyMessage message, org.jbossmq.pm.Tx
>transactionId ) throws JMSException {
> try{
>
> - messageAddedRecord.message = message;
> - messageAddedRecord.messageId = message.messageId;
> if( transactionId == null ) {
> - messageAddedRecord.isTransacted = false;
> + transactionLog.add(message.messageId,false,-1,message);
> } else {
> - messageAddedRecord.isTransacted = true;
> - messageAddedRecord.transactionId =
>transactionId.longValue();
> +
>transactionLog.add(message.messageId,true,transactionId.longValue(),message);
> }
> -
> - transactionLog.add(messageAddedRecord);
> transactionLog.commit();
>
> } catch ( IOException e ) {
> @@ -100,17 +76,14 @@
>
> }
>
> - synchronized public void remove( SpyMessage message, Long
>transactionId ) throws JMSException {
> + synchronized public void remove( SpyMessage message, org.jbossmq.pm.Tx
>transactionId ) throws JMSException {
> try{
>
> - messageRemovedRecord.messageId = message.messageId;
> if( transactionId == null ) {
> - messageRemovedRecord.isTransacted = false;
> + transactionLog.remove(message.messageId,false,-1);
> } else {
> - messageRemovedRecord.isTransacted = true;
> - messageRemovedRecord.transactionId =
>transactionId.longValue();
> +
>transactionLog.remove(message.messageId,true,transactionId.longValue());
> }
> - transactionLog.add(messageRemovedRecord);
> transactionLog.commit();
>
> } catch ( IOException e ) {
> @@ -124,29 +97,29 @@
> java.util.HashMap messageIndex = new java.util.HashMap();
>
> try {
> - ObjectIntegrityLog.IndexItem objects[] =
>transactionLog.toIndex();
> + java.util.LinkedList objects = transactionLog.toIndex();
>
> - for( int i=0; i < objects.length; i++ ) {
> + for(java.util.Iterator it = objects.iterator();it.hasNext(); )
>{
>
> - Object o = objects[i].record;
> - if( o instanceof MessageAddedRecord ) {
> + Object o = it.next();
> + if( o instanceof IntegrityLog.MessageAddedRecord ) {
>
> - MessageAddedRecord r = (MessageAddedRecord)o;
> + IntegrityLog.MessageAddedRecord r =
>(IntegrityLog.MessageAddedRecord)o;
> r.message.messageId = r.messageId;
>
> - if( r.isTransacted && !commited.contains(new
>Long(r.transactionId)) ) {
> + if( r.isTransacted && !commited.contains(new
>org.jbossmq.pm.Tx(r.transactionId)) ) {
> // the TX this message was part of was
>not
> // commited... so drop this message
> continue;
> }
>
> - messageIndex.put( new Long(r.messageId),
>objects[i]);
> + messageIndex.put( new Long(r.messageId), o);
>
> - } else if( o instanceof MessageRemovedRecord ) {
> + } else if( o instanceof
>IntegrityLog.MessageRemovedRecord ) {
>
> - MessageRemovedRecord r =
>(MessageRemovedRecord)o;
> + IntegrityLog.MessageRemovedRecord r =
>(IntegrityLog.MessageRemovedRecord)o;
>
> - if( r.isTransacted && !commited.contains(new
>Long(r.transactionId)) ) {
> + if( r.isTransacted && !commited.contains(new
>org.jbossmq.pm.Tx(r.transactionId)) ) {
> // the TX this message was part of was
>not
> // commited... so drop this message
> continue;
> @@ -158,20 +131,20 @@
>
> }
> } catch ( Exception e ) {
> +// e.printStackTrace();
> throwJMSException("Could not rebuild the queue from the
>queue's
>tranaction log.",e);
> }
>
> SpyMessage rc[] = new SpyMessage[messageIndex.size()];
> java.util.Iterator iter = messageIndex.values().iterator();
> for( int i=0; iter.hasNext(); i++ ) {
> - ObjectIntegrityLog.IndexItem item =
>(ObjectIntegrityLog.IndexItem)iter.next();
> - rc[i] = ((MessageAddedRecord)item.record).message;
> + rc[i] = ((IntegrityLog.MessageAddedRecord)iter.next()).message;
> }
> return rc;
> }
>
> private void throwJMSException(String message, Exception e) throws
>JMSException {
> - JMSException newE = new JMSException(message);
> + JMSException newE = new SpyJMSException(message);
> newE.setLinkedException(e);
> throw newE;
> }
>
>
>
> 1.2 +8 -6
>jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java
>
> Index: SpyTxLog.java
> ===================================================================
> RCS file:
>/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java,v
> retrieving revision 1.1
> retrieving revision 1.2
> diff -u -r1.1 -r1.2
> --- SpyTxLog.java 2001/07/31 20:17:52 1.1
> +++ SpyTxLog.java 2001/08/09 01:18:28 1.2
> @@ -6,6 +6,8 @@
> */
> package org.jbossmq.pm.rollinglogged;
>
> +import org.jbossmq.SpyJMSException;
> +
> import java.io.Serializable;
> import java.io.IOException;
>
> @@ -15,14 +17,14 @@
> * This is used to keep a log of commited transactions.
> *
> * @author: Hiram Chirino ([EMAIL PROTECTED])
> - * @version $Revision: 1.1 $
> + * @version $Revision: 1.2 $
> */
> public class SpyTxLog {
>
> /////////////////////////////////////////////////////////////////////
> // Attributes
> /////////////////////////////////////////////////////////////////////
> - private ObjectIntegrityLog transactionLog;
> + private IntegrityLog transactionLog;
> private int liveTransactionCount = 0;
> private Object counterLock = new Object();
>
> @@ -31,7 +33,7 @@
> /////////////////////////////////////////////////////////////////////
> public SpyTxLog(String fileName) throws JMSException {
> try {
> - transactionLog = new ObjectIntegrityLog(fileName);
> + transactionLog = new IntegrityLog(fileName);
> } catch (IOException e) {
> throwJMSException("Could not open the queue's tranaction log:
>" +
>fileName, e);
> }
> @@ -69,10 +71,10 @@
> }
> }
>
> - synchronized public void commitTx(Long id) throws JMSException {
> + synchronized public void commitTx(org.jbossmq.pm.Tx id) throws
>JMSException {
>
> try {
> - transactionLog.add(id);
> + transactionLog.addTx(id);
> transactionLog.commit();
> synchronized(counterLock){
> --liveTransactionCount;
> @@ -91,7 +93,7 @@
> }
> }
>
> - public void rollbackTx(Long txId) throws JMSException {
> + public void rollbackTx(org.jbossmq.pm.Tx txId) throws JMSException {
> synchronized(counterLock){
> --liveTransactionCount;
> }
> @@ -101,7 +103,7 @@
> // Private Methods
> /////////////////////////////////////////////////////////////////////
> private void throwJMSException(String message, Exception e) throws
>JMSException {
> - JMSException newE = new JMSException(message);
> + JMSException newE = new SpyJMSException(message);
> newE.setLinkedException(e);
> throw newE;
> }
>
>
>
>
>_______________________________________________
>Jboss-development mailing list
>[EMAIL PROTECTED]
>http://lists.sourceforge.net/lists/listinfo/jboss-development
_________________________________________________________________
Get your FREE download of MSN Explorer at http://explorer.msn.com/intl.asp
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development