User: pkendall
Date: 01/08/08 18:18:28
Modified: src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java
PersistenceManager.java
PersistenceManagerMBean.java SpyMessageLog.java
SpyTxLog.java
Removed: src/main/org/jbossmq/pm/rollinglogged
ObjectIntegrityLog.java
Log:
Major updates (especially to topics).
Speed improvements.
Make JVM IL work (by using a singleton JMSServer).
Message Listeners re-implemented using client-side thread.
Revision Changes Path
1.2 +173 -169 jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java
Index: IntegrityLog.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- IntegrityLog.java 2001/07/31 20:17:52 1.1
+++ IntegrityLog.java 2001/08/09 01:18:28 1.2
@@ -6,150 +6,101 @@
*/
package org.jbossmq.pm.rollinglogged;
-import java.io.RandomAccessFile;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.IOException;
-import java.io.File;
+import java.io.*;
-
+import org.jbossmq.*;
/**
* This class is used to create a log file which which will will garantee
- * it's integrety up to the last commit point.
- *
- * The InputStream returned by getInputStream() will read
- * data placed into the log with the OutputStream returned by
- * getOutputStream(). The EOF for the InputStream is the
- * last commited point of the OutputStream.
+ * it's integrety up to the last commit point. An optimised version of the
+ * integrityLog in the logged persistence.
*
- * @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @author: David Maplesden ([EMAIL PROTECTED])
+ * @version $Revision: 1.2 $
*/
-public class IntegrityLog {
+public class IntegrityLog{
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
- private static final int HEADER_SIZE=16; // in bytes
-
- // Header related stuff
- private long firstRecordPos;
- private long nextRecordPos;
- private byte headerBytes[]=new byte[HEADER_SIZE];
private RandomAccessFile raf;
private File f;
-
- private LogOutputStream outputStream;
- private LogInputStream inputStream;
-
-
- /////////////////////////////////////////////////////////////////////
- // Helper Inner Classes
- /////////////////////////////////////////////////////////////////////
- class LogInputStream extends InputStream {
-
- boolean closed = false;
- long inputPos = 0;
+ private ObjectOutput objectOutput;
- public long getFilePointer() {
- return inputPos;
+ ////////////////////////////////////////////////////////////
+ // Helper Inner classes. //
+ ////////////////////////////////////////////////////////////
+
+ class MyOutputStream extends OutputStream{
+ public void close() throws IOException{
+ flush();
+ }
+ public void write(int b) throws IOException {
+ raf.write( (byte)b );
}
- public void close() throws IOException {
- super.close();
- closed = true;
+ public void write(byte bytes[], int off, int len) throws IOException {
+ raf.write( bytes, off, len );
}
- public int read() throws IOException {
- inputPos = Math.max(inputPos, firstRecordPos);
- int rc = IntegrityLog.this.read(inputPos);
- if( rc >= 0 )
- inputPos ++;
- return rc;
+ }
+
+ class MyObjectOutputStream extends ObjectOutputStream {
+ MyObjectOutputStream(OutputStream os) throws IOException {
+ super(os);
}
- public int read(byte bytes[], int off, int len) throws IOException {
- inputPos = Math.max(inputPos, firstRecordPos);
- int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
- if( rc >= 0 )
- inputPos += rc;
- return rc;
+ protected void writeStreamHeader() {
}
}
- class LogOutputStream extends OutputStream {
- boolean closed = false;
- public long getFilePointer() {
- return nextRecordPos;
- }
- public void close() throws IOException {
- super.close();
- closed = true;
+ class MyObjectInputStream extends ObjectInputStream {
+ MyObjectInputStream(InputStream is) throws IOException {
+ super(is);
}
- public void write(int b) throws IOException {
- IntegrityLog.this.write( (byte)b );
+ protected void readStreamHeader() {
}
- public void write(byte bytes[], int off, int len) throws IOException {
- IntegrityLog.this.write( bytes, off, len );
+ }
+
+ class MyInputStream extends InputStream{
+ public void close() throws IOException{
+ }
+ public int read() throws IOException {
+ return raf.read( );
}
+ public int read(byte bytes[], int off, int len) throws IOException {
+ return raf.read( bytes, off, len );
+ }
+ }
+
+ class MessageAddedRecord implements Serializable {
+ long messageId;
+ boolean isTransacted;
+ long transactionId;
+ SpyMessage message;
+ private final static long serialVersionUID = 235726945332013954L;
}
+ class MessageRemovedRecord implements Serializable {
+ boolean isTransacted;
+ long transactionId;
+ long messageId;
+ private final static long serialVersionUID = 235726945332013955L;
+ }
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public IntegrityLog(String fileName) throws IOException {
f = new File(fileName);
- boolean exists = f.isFile();
-
raf = new RandomAccessFile(f, "rw");
- if( exists ) {
- loadHeader();
- } else {
- initHeader();
- }
+ this.objectOutput = new MyObjectOutputStream(new MyOutputStream());
+ seekEnd();
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
- public LogInputStream getInputStream() {
- if ( inputStream==null || inputStream.closed ) {
- inputStream = new LogInputStream();
- }
- return inputStream;
- }
-
- public LogOutputStream getOutputStream() throws IOException {
- if ( outputStream==null || outputStream.closed ) {
- outputStream = new LogOutputStream();
- }
- return outputStream;
- }
public void commit() throws IOException {
-
- headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
- headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
- headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
- headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
- headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
- headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
- headerBytes[6] = (byte)((firstRecordPos >>> 8) & 0xFF);
- headerBytes[7] = (byte)((firstRecordPos >>> 0) & 0xFF);
- headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
- headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
- headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
- headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
- headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
- headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
- headerBytes[14] =(byte)((nextRecordPos >>> 8) & 0xFF);
- headerBytes[15] =(byte)((nextRecordPos >>> 0) & 0xFF);
-
- raf.seek(0);
- raf.write(headerBytes);
- }
-
- public void rollback() throws IOException {
- loadHeader();
+ //raf.getFD().sync();
}
public void delete() throws IOException {
@@ -160,79 +111,132 @@
raf.close();
raf = null;
}
-
- /////////////////////////////////////////////////////////////////////
- // Private Methods
- /////////////////////////////////////////////////////////////////////
- private long getBytesLeft(long offset) {
-
- return nextRecordPos-offset;
-
- }
-
- private void initHeader() throws IOException {
-
- firstRecordPos = HEADER_SIZE;
- nextRecordPos = HEADER_SIZE;
-
- commit();
- }
-
- private void loadHeader() throws IOException {
- raf.seek(0);
- firstRecordPos = raf.readLong();
- nextRecordPos = raf.readLong();
-
- }
-
- private int read(long offset) throws IOException {
-
- if( offset >= nextRecordPos )
- return -1;
-
- if( raf.getFilePointer() != offset ) {
- raf.seek(offset);
- }
-
- int rc = raf.read();
- return rc;
-
- }
-
- private int read(long offset, byte bytes[], int off, int len) throws
IOException {
-
- if( offset >= nextRecordPos )
- return -1;
+ protected static final byte TX = 0;
+ protected static final byte ADD = 1;
+ protected static final byte REMOVE = 2;
+
+ public synchronized void addTx(org.jbossmq.pm.Tx tx) throws IOException{
+ raf.writeByte(TX);
+ raf.writeLong(tx.longValue());
+ }
- len = (int)Math.min(len, getBytesLeft(offset));
+ public synchronized void add(long messageID, boolean isTransacted, long txId,
SpyMessage message)throws IOException{
+ raf.writeByte(ADD);
+ raf.writeLong(messageID);
+ raf.writeBoolean(isTransacted);
+ raf.writeLong(txId);
+ SpyMessage.writeMessage(message,objectOutput);
+ objectOutput.flush();
+ }
- if( raf.getFilePointer() != offset ) {
- raf.seek(offset);
- }
+ public synchronized void remove(long messageID, boolean isTransacted, long
txId)throws IOException{
+ raf.writeByte(REMOVE);
+ raf.writeLong(messageID);
+ raf.writeBoolean(isTransacted);
+ raf.writeLong(txId);
+ }
- int rc = raf.read(bytes, off, len);
- return rc;
+ public void skipNextEntry(ObjectInput in) throws IOException{
+ byte type = raf.readByte();
+ switch(type){
+ case TX:
+ raf.readLong();
+ return;
+ case ADD:
+ raf.readLong();
+ raf.readBoolean();
+ raf.readLong();
+ SpyMessage.readMessage(in);
+ return;
+ case REMOVE:
+ raf.readLong();
+ raf.readBoolean();
+ raf.readLong();
+ return;
+ default:
+ throw new java.io.IOException("Error in log file format.");
+ }
+ }
- }
+ public java.util.LinkedList toIndex() throws IOException{
+ raf.seek(0);
+ long length = raf.length();
+ long pos = 0;
+ ObjectInput in = new MyObjectInputStream(new MyInputStream());
+ java.util.LinkedList ll = new java.util.LinkedList();
+ try{
+ while(pos < length){
+ ll.add(readNextEntry(in));
+ pos = raf.getFilePointer();
+ }
+ }catch(EOFException e){
+ //incomplete record
+ }
+ in.close();
+ raf.seek(pos);
- private void write(byte []record, int off, int len) throws IOException {
- if( raf.getFilePointer() != nextRecordPos ) {
- raf.seek(nextRecordPos);
- }
+ return ll;
+ }
- raf.write(record, off, len);
- nextRecordPos+=len;
- }
+ public java.util.TreeSet toTreeSet() throws IOException{
+ raf.seek(0);
+ long length = raf.length();
+ long pos = 0;
+ ObjectInput in = new MyObjectInputStream(new MyInputStream());
+ java.util.TreeSet ll = new java.util.TreeSet();
+ try{
+ while(pos < length){
+ ll.add(readNextEntry(in));
+ pos = raf.getFilePointer();
+ }
+ }catch(EOFException e){
+ //incomplete record
+ }
+ in.close();
+ raf.seek(pos);
- private void write(byte b) throws IOException {
+ return ll;
+ }
- if( raf.getFilePointer() != nextRecordPos ) {
- raf.seek(nextRecordPos);
- }
+ public Object readNextEntry(ObjectInput in) throws IOException{
+ byte type = raf.readByte();
+ switch(type){
+ case TX:
+ return new org.jbossmq.pm.Tx(raf.readLong());
+ case ADD:
+ MessageAddedRecord add = new MessageAddedRecord();
+ add.messageId = raf.readLong();
+ add.isTransacted = raf.readBoolean();
+ add.transactionId = raf.readLong();
+ add.message = SpyMessage.readMessage(in);
+ return add;
+ case REMOVE:
+ MessageRemovedRecord remove = new MessageRemovedRecord();
+ remove.messageId = raf.readLong();
+ remove.isTransacted = raf.readBoolean();
+ remove.transactionId = raf.readLong();
+ return remove;
+ default:
+ throw new java.io.IOException("Error in log file format.");
+ }
+ }
- raf.write(b);
- nextRecordPos++;
+ private void seekEnd() throws IOException {
+ raf.seek(0);
+ long length = raf.length();
+ long pos = 0;
+ ObjectInput in = new MyObjectInputStream(new MyInputStream());
+ try{
+ while(pos < length){
+ skipNextEntry(in);
+ pos = raf.getFilePointer();
+ }
+ }catch(EOFException e){
+ //incomplete record, must have been due to program failure during write.
+ }
+ in.close();
+ raf.seek(pos);
}
}
1.3 +68 -19
jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PersistenceManager.java 2001/08/01 20:22:35 1.2
+++ PersistenceManager.java 2001/08/09 01:18:28 1.3
@@ -14,8 +14,8 @@
import org.jbossmq.server.JMSDestination;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyDestination;
+import org.jbossmq.SpyJMSException;
-
import javax.naming.InitialContext;
import org.jbossmq.pm.TxManager;
import org.jboss.util.ServiceMBeanSupport;
@@ -25,9 +25,9 @@
/**
* This class manages all persistence related services.
*
- * @author David Maplesden
+ * @author David Maplesden ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class PersistenceManager
extends ServiceMBeanSupport
@@ -71,28 +71,69 @@
}
static class TxInfo {
- Long txId;
+ org.jbossmq.pm.Tx txId;
LinkedList addMessages = new LinkedList();
LinkedList ackMessages = new LinkedList();
SpyTxLog log;
- TxInfo(Long txId,SpyTxLog log){
+ TxInfo(org.jbossmq.pm.Tx txId,SpyTxLog log){
this.txId = txId;
this.log = log;
}
}
+
+ protected java.util.ArrayList listPool = new java.util.ArrayList();
+ protected java.util.ArrayList txPool = new java.util.ArrayList();
+
+ protected static int MAX_POOL_SIZE = 50;
+
+ protected TxInfo getTxInfo(org.jbossmq.pm.Tx txId, SpyTxLog txLog){
+ if(listPool.isEmpty()){
+ return new TxInfo(txId,txLog);
+ }else{
+ TxInfo info = (TxInfo)listPool.remove(listPool.size()-1);
+ info.txId = txId;
+ info.log = txLog;
+ return info;
+ }
+ }
+
+ protected void releaseTxInfo(TxInfo list){
+ if(listPool.size() < MAX_POOL_SIZE){
+ list.ackMessages.clear();
+ list.addMessages.clear();
+ listPool.add(list);
+ }
+ }
- public Long createPersistentTx() throws javax.jms.JMSException {
- Long txId = null;
+ protected org.jbossmq.pm.Tx getTx(long value){
+ if(txPool.isEmpty()){
+ return new org.jbossmq.pm.Tx(value);
+ }else{
+ org.jbossmq.pm.Tx tx = (org.jbossmq.pm.Tx)txPool.remove(listPool.size()-1);
+ tx.setValue(value);
+ return tx;
+ }
+ }
+
+ protected void releaseTx(org.jbossmq.pm.Tx tx){
+ if(txPool.size() < MAX_POOL_SIZE){
+ txPool.add(tx);
+ }
+ }
+
+ public org.jbossmq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
+ org.jbossmq.pm.Tx txId = null;
SpyTxLog txLog = currentTxLog;
synchronized(transToTxLogs){
- txId = new Long(++nextTxId);
- transToTxLogs.put(txId,new TxInfo(txId,txLog));
+ txId = getTx(++nextTxId);
+ transToTxLogs.put(txId,getTxInfo(txId,txLog));
}
txLog.createTx();
return txId;
}
- public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
+ public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
+//System.out.println("Committing TX "+Long.toHexString(txId.longValue()));
TxInfo info = null;
LinkedList messagesToDelete = null;
synchronized(transToTxLogs){
@@ -101,10 +142,14 @@
}
deleteMessages(messagesToDelete);
info.log.commitTx(txId);
+ synchronized(transToTxLogs){
+ releaseTx(txId);
+ releaseTxInfo(info);
+ }
checkCleanup(info.log);
}
- public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
+ public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
TxInfo info = null;
LinkedList messagesToDelete = null;
synchronized(transToTxLogs){
@@ -113,6 +158,10 @@
}
deleteMessages(messagesToDelete);
info.log.rollbackTx(txId);
+ synchronized(transToTxLogs){
+ releaseTx(txId);
+ releaseTxInfo(info);
+ }
checkCleanup(info.log);
}
@@ -160,7 +209,7 @@
}
checkCleanup(oldLog);
}catch(java.net.MalformedURLException e){
- JMSException jme = new JMSException("Error rolling over logs to new files.");
+ JMSException jme = new SpyJMSException("Error rolling over logs to new
files.");
jme.setLinkedException(e);
throw jme;
}
@@ -224,7 +273,7 @@
log = (SpyMessageLog)logs.remove(key);
}
if( log == null )
- throw new JMSException("The persistence log was never initialized");
+ throw new SpyJMSException("The persistence log was never initialized");
log.close();
log.delete();
@@ -257,8 +306,8 @@
}
- public void add(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
-
+ public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
+//System.out.println("Add message "+Long.toHexString(message.messageId)+" in trans
"+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination());
LogInfo logInfo;
SpyTxLog txLog = null;
@@ -275,7 +324,7 @@
logs = (HashMap)messageLogs.get(txLog);
}
synchronized(logs){
- logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
+ logInfo = (LogInfo)
logs.get(message.getJMSDestination().toString());
}
if (logInfo == null)
@@ -295,14 +344,15 @@
checkRollOver();
}
- public void remove(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+ public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId)
throws javax.jms.JMSException {
+//System.out.println("Removing message "+Long.toHexString(message.messageId)+" in
trans "+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination());
LogInfo logInfo;
SpyTxLog txLog = ((LogInfo)message.persistData).txLog;
synchronized (messageLogs) {
HashMap logs = (HashMap)messageLogs.get(txLog);
- logInfo = (LogInfo) logs.get(""+message.getJMSDestination());
+ logInfo = (LogInfo)
logs.get(message.getJMSDestination().toString());
}
if (logInfo == null)
@@ -354,7 +404,6 @@
if(DEBUG) System.out.println("Using new rolling logged persistence manager.");
URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
-
dataDirURL = new URL(configFile, dataDirectory);
//Get an InitialContext
@@ -386,7 +435,7 @@
}
if(!commitedTxs.isEmpty())
- nextTxId = ((Long)commitedTxs.last()).longValue();
+ nextTxId = ((org.jbossmq.pm.Tx)commitedTxs.last()).longValue();
for(int i=0;i<dataFiles.length;++i){
String name = dataFiles[i].getName();
1.2 +0 -0
jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java
Index: PersistenceManagerMBean.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PersistenceManagerMBean.java 2001/07/31 20:17:52 1.1
+++ PersistenceManagerMBean.java 2001/08/09 01:18:28 1.2
@@ -13,7 +13,7 @@
*
* @see <related>
* @author Vincent Sheffer ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface PersistenceManagerMBean
extends org.jboss.util.ServiceMBean
1.2 +23 -50 jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java
Index: SpyMessageLog.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyMessageLog.java 2001/07/31 20:17:52 1.1
+++ SpyMessageLog.java 2001/08/09 01:18:28 1.2
@@ -6,6 +6,8 @@
*/
package org.jbossmq.pm.rollinglogged;
+import org.jbossmq.SpyJMSException;
+
import java.io.IOException;
import java.io.Serializable;
import javax.jms.JMSException;
@@ -18,42 +20,21 @@
* provider failure. Integrety is kept by the use of an ObjectIntegrityLog.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyMessageLog {
/////////////////////////////////////////////////////////////////////
// Attributes
- /////////////////////////////////////////////////////////////////////
- private ObjectIntegrityLog transactionLog;
- private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
- private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
-
/////////////////////////////////////////////////////////////////////
- // Helper Inner Classes
- /////////////////////////////////////////////////////////////////////
- static class MessageAddedRecord implements Serializable {
- long messageId;
- boolean isTransacted;
- long transactionId;
- SpyMessage message;
- private final static long serialVersionUID = 235726945332013954L;
- }
+ private IntegrityLog transactionLog;
- static class MessageRemovedRecord implements Serializable {
- boolean isTransacted;
- long transactionId;
- long messageId;
- private final static long serialVersionUID = 235726945332013955L;
- }
-
-
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public SpyMessageLog(String fileName) throws JMSException {
try {
- transactionLog = new ObjectIntegrityLog(fileName);
+ transactionLog = new IntegrityLog(fileName);
} catch ( IOException e ) {
throwJMSException("Could not open the queue's tranaction log:
"+fileName,e);
}
@@ -79,19 +60,14 @@
}
}
- synchronized public void add( SpyMessage message, Long transactionId ) throws
JMSException {
+ synchronized public void add( SpyMessage message, org.jbossmq.pm.Tx
transactionId ) throws JMSException {
try{
- messageAddedRecord.message = message;
- messageAddedRecord.messageId = message.messageId;
if( transactionId == null ) {
- messageAddedRecord.isTransacted = false;
+ transactionLog.add(message.messageId,false,-1,message);
} else {
- messageAddedRecord.isTransacted = true;
- messageAddedRecord.transactionId =
transactionId.longValue();
+
transactionLog.add(message.messageId,true,transactionId.longValue(),message);
}
-
- transactionLog.add(messageAddedRecord);
transactionLog.commit();
} catch ( IOException e ) {
@@ -100,17 +76,14 @@
}
- synchronized public void remove( SpyMessage message, Long transactionId )
throws JMSException {
+ synchronized public void remove( SpyMessage message, org.jbossmq.pm.Tx
transactionId ) throws JMSException {
try{
- messageRemovedRecord.messageId = message.messageId;
if( transactionId == null ) {
- messageRemovedRecord.isTransacted = false;
+ transactionLog.remove(message.messageId,false,-1);
} else {
- messageRemovedRecord.isTransacted = true;
- messageRemovedRecord.transactionId =
transactionId.longValue();
+ transactionLog.remove(message.messageId,true,transactionId.longValue());
}
- transactionLog.add(messageRemovedRecord);
transactionLog.commit();
} catch ( IOException e ) {
@@ -124,29 +97,29 @@
java.util.HashMap messageIndex = new java.util.HashMap();
try {
- ObjectIntegrityLog.IndexItem objects[] =
transactionLog.toIndex();
+ java.util.LinkedList objects = transactionLog.toIndex();
- for( int i=0; i < objects.length; i++ ) {
+ for(java.util.Iterator it = objects.iterator();it.hasNext(); )
{
- Object o = objects[i].record;
- if( o instanceof MessageAddedRecord ) {
+ Object o = it.next();
+ if( o instanceof IntegrityLog.MessageAddedRecord ) {
- MessageAddedRecord r = (MessageAddedRecord)o;
+ IntegrityLog.MessageAddedRecord r =
(IntegrityLog.MessageAddedRecord)o;
r.message.messageId = r.messageId;
- if( r.isTransacted && !commited.contains(new
Long(r.transactionId)) ) {
+ if( r.isTransacted && !commited.contains(new
org.jbossmq.pm.Tx(r.transactionId)) ) {
// the TX this message was part of was
not
// commited... so drop this message
continue;
}
- messageIndex.put( new Long(r.messageId),
objects[i]);
+ messageIndex.put( new Long(r.messageId), o);
- } else if( o instanceof MessageRemovedRecord ) {
+ } else if( o instanceof
IntegrityLog.MessageRemovedRecord ) {
- MessageRemovedRecord r =
(MessageRemovedRecord)o;
+ IntegrityLog.MessageRemovedRecord r =
(IntegrityLog.MessageRemovedRecord)o;
- if( r.isTransacted && !commited.contains(new
Long(r.transactionId)) ) {
+ if( r.isTransacted && !commited.contains(new
org.jbossmq.pm.Tx(r.transactionId)) ) {
// the TX this message was part of was
not
// commited... so drop this message
continue;
@@ -158,20 +131,20 @@
}
} catch ( Exception e ) {
+// e.printStackTrace();
throwJMSException("Could not rebuild the queue from the
queue's tranaction log.",e);
}
SpyMessage rc[] = new SpyMessage[messageIndex.size()];
java.util.Iterator iter = messageIndex.values().iterator();
for( int i=0; iter.hasNext(); i++ ) {
- ObjectIntegrityLog.IndexItem item =
(ObjectIntegrityLog.IndexItem)iter.next();
- rc[i] = ((MessageAddedRecord)item.record).message;
+ rc[i] = ((IntegrityLog.MessageAddedRecord)iter.next()).message;
}
return rc;
}
private void throwJMSException(String message, Exception e) throws
JMSException {
- JMSException newE = new JMSException(message);
+ JMSException newE = new SpyJMSException(message);
newE.setLinkedException(e);
throw newE;
}
1.2 +8 -6 jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java
Index: SpyTxLog.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyTxLog.java 2001/07/31 20:17:52 1.1
+++ SpyTxLog.java 2001/08/09 01:18:28 1.2
@@ -6,6 +6,8 @@
*/
package org.jbossmq.pm.rollinglogged;
+import org.jbossmq.SpyJMSException;
+
import java.io.Serializable;
import java.io.IOException;
@@ -15,14 +17,14 @@
* This is used to keep a log of commited transactions.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyTxLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
- private ObjectIntegrityLog transactionLog;
+ private IntegrityLog transactionLog;
private int liveTransactionCount = 0;
private Object counterLock = new Object();
@@ -31,7 +33,7 @@
/////////////////////////////////////////////////////////////////////
public SpyTxLog(String fileName) throws JMSException {
try {
- transactionLog = new ObjectIntegrityLog(fileName);
+ transactionLog = new IntegrityLog(fileName);
} catch (IOException e) {
throwJMSException("Could not open the queue's tranaction log:
" + fileName, e);
}
@@ -69,10 +71,10 @@
}
}
- synchronized public void commitTx(Long id) throws JMSException {
+ synchronized public void commitTx(org.jbossmq.pm.Tx id) throws JMSException {
try {
- transactionLog.add(id);
+ transactionLog.addTx(id);
transactionLog.commit();
synchronized(counterLock){
--liveTransactionCount;
@@ -91,7 +93,7 @@
}
}
- public void rollbackTx(Long txId) throws JMSException {
+ public void rollbackTx(org.jbossmq.pm.Tx txId) throws JMSException {
synchronized(counterLock){
--liveTransactionCount;
}
@@ -101,7 +103,7 @@
// Private Methods
/////////////////////////////////////////////////////////////////////
private void throwJMSException(String message, Exception e) throws
JMSException {
- JMSException newE = new JMSException(message);
+ JMSException newE = new SpyJMSException(message);
newE.setLinkedException(e);
throw newE;
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development