User: chirino
Date: 01/07/10 19:52:17
Added: src/main/org/jbossmq/pm/logged IntegrityLog.java
ObjectIntegrityLog.java PersistenceManager.java
PersistenceManagerMBean.java SpyMessageLog.java
SpyMessageLogTester.java SpyTxLog.java
Log:
These are the 3 PersistenceManager implementations we have so far. The are now all
configured via JMX
Revision Changes Path
1.1 jbossmq/src/main/org/jbossmq/pm/logged/IntegrityLog.java
Index: IntegrityLog.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.pm.logged;
import java.io.RandomAccessFile;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.File;
/**
* This class is used to create a log file which which will will garantee
* it's integrety up to the last commit point.
*
* The InputStream returned by getInputStream() will read
* data placed into the log with the OutputStream returned by
* getOutputStream(). The EOF for the InputStream is the
* last commited point of the OutputStream.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class IntegrityLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private static final int HEADER_SIZE=16; // in bytes
// Header related stuff
private long firstRecordPos;
private long nextRecordPos;
private byte headerBytes[]=new byte[HEADER_SIZE];
private RandomAccessFile raf;
private LogOutputStream outputStream;
private LogInputStream inputStream;
/////////////////////////////////////////////////////////////////////
// Helper Inner Classes
/////////////////////////////////////////////////////////////////////
class LogInputStream extends InputStream {
boolean closed = false;
long inputPos = 0;
public long getFilePointer() {
return inputPos;
}
public void close() throws IOException {
super.close();
closed = true;
}
public int read() throws IOException {
inputPos = Math.max(inputPos, firstRecordPos);
int rc = IntegrityLog.this.read(inputPos);
if( rc >= 0 )
inputPos ++;
return rc;
}
public int read(byte bytes[], int off, int len) throws IOException {
inputPos = Math.max(inputPos, firstRecordPos);
int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
if( rc >= 0 )
inputPos += rc;
return rc;
}
}
class LogOutputStream extends OutputStream {
boolean closed = false;
public long getFilePointer() {
return nextRecordPos;
}
public void close() throws IOException {
super.close();
closed = true;
}
public void write(int b) throws IOException {
IntegrityLog.this.write( (byte)b );
}
public void write(byte bytes[], int off, int len) throws IOException {
IntegrityLog.this.write( bytes, off, len );
}
}
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public IntegrityLog(String fileName) throws IOException {
File f = new File(fileName);
boolean exists = f.isFile();
raf = new RandomAccessFile(f, "rw");
if( exists ) {
loadHeader();
} else {
initHeader();
}
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
public LogInputStream getInputStream() {
if ( inputStream==null || inputStream.closed ) {
inputStream = new LogInputStream();
}
return inputStream;
}
public LogOutputStream getOutputStream() throws IOException {
if ( outputStream==null || outputStream.closed ) {
outputStream = new LogOutputStream();
}
return outputStream;
}
public void commit() throws IOException {
headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
headerBytes[6] = (byte)((firstRecordPos >>> 8) & 0xFF);
headerBytes[7] = (byte)((firstRecordPos >>> 0) & 0xFF);
headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
headerBytes[14] =(byte)((nextRecordPos >>> 8) & 0xFF);
headerBytes[15] =(byte)((nextRecordPos >>> 0) & 0xFF);
raf.seek(0);
raf.write(headerBytes);
}
public void rollback() throws IOException {
loadHeader();
}
public void close() throws IOException {
raf.close();
raf = null;
}
/////////////////////////////////////////////////////////////////////
// Private Methods
/////////////////////////////////////////////////////////////////////
private long getBytesLeft(long offset) {
return nextRecordPos-offset;
}
private void initHeader() throws IOException {
firstRecordPos = HEADER_SIZE;
nextRecordPos = HEADER_SIZE;
commit();
}
private void loadHeader() throws IOException {
raf.seek(0);
firstRecordPos = raf.readLong();
nextRecordPos = raf.readLong();
}
private int read(long offset) throws IOException {
if( offset >= nextRecordPos )
return -1;
if( raf.getFilePointer() != offset ) {
raf.seek(offset);
}
int rc = raf.read();
return rc;
}
private int read(long offset, byte bytes[], int off, int len) throws
IOException {
if( offset >= nextRecordPos )
return -1;
len = (int)Math.min(len, getBytesLeft(offset));
if( raf.getFilePointer() != offset ) {
raf.seek(offset);
}
int rc = raf.read(bytes, off, len);
return rc;
}
private void write(byte []record, int off, int len) throws IOException {
if( raf.getFilePointer() != nextRecordPos ) {
raf.seek(nextRecordPos);
}
raf.write(record, off, len);
nextRecordPos+=len;
}
private void write(byte b) throws IOException {
if( raf.getFilePointer() != nextRecordPos ) {
raf.seek(nextRecordPos);
}
raf.write(b);
nextRecordPos++;
}
}
1.1 jbossmq/src/main/org/jbossmq/pm/logged/ObjectIntegrityLog.java
Index: ObjectIntegrityLog.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.pm.logged;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.io.BufferedInputStream;
import javax.jms.JMSException;
/**
* This is used to keep a log of Serializable Objects with garanteed integrety.
*
* Every object add()ed to the log without an exception is garenteed
* to be recovered by any of the to*() methods. The log file will not be
* corrupted if the process dies in the middle of an add().
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class ObjectIntegrityLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private IntegrityLog.LogOutputStream logOutputStream;
private ObjectOutputStream out;
private IntegrityLog transactionLog;
static class IndexItem {
long recordOffset;
Object record;
}
/////////////////////////////////////////////////////////////////////
// Helper Inner classes
/////////////////////////////////////////////////////////////////////
static class MyObjectOutputStream extends ObjectOutputStream {
MyObjectOutputStream(OutputStream os) throws IOException {
super(os);
}
/**
* diable the writing of the stream header.
*/
protected void writeStreamHeader() {
}
}
static class MyObjectInputStream extends ObjectInputStream {
MyObjectInputStream(InputStream is) throws IOException {
super(is);
}
/**
* diable the reading of the stream header.
*/
protected void readStreamHeader() {
}
}
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public ObjectIntegrityLog(String fileName) throws IOException {
transactionLog = new IntegrityLog(fileName);
logOutputStream = transactionLog.getOutputStream();
out = new MyObjectOutputStream(logOutputStream);
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
public void commit() throws IOException {
transactionLog.commit();
}
public void rollback() throws IOException {
transactionLog.rollback();
}
public void close() throws IOException {
transactionLog.close();
}
public IndexItem add(Object o) throws IOException {
IndexItem item = new IndexItem();
item.record = o;
item.recordOffset = logOutputStream.getFilePointer();
out.writeObject(o);
out.reset();
out.flush();
return item;
}
public Object[] toArray() throws IOException, ClassNotFoundException {
java.util.LinkedList ll = new java.util.LinkedList();
ObjectInputStream in = new MyObjectInputStream(new
BufferedInputStream(transactionLog.getInputStream()));
try {
while (true) {
Object o = in.readObject();
ll.addLast(o);
}
} catch (java.io.EOFException e) {
}
in.close();
Object rc[] = new Object[ll.size()];
return (Object[]) ll.toArray(rc);
}
public java.util.HashSet toHashSet() throws IOException,
ClassNotFoundException {
java.util.HashSet hash = new java.util.HashSet();
ObjectInputStream in = new MyObjectInputStream(new
BufferedInputStream(transactionLog.getInputStream()));
try {
while (true) {
Object o = in.readObject();
hash.add(o);
}
} catch (java.io.EOFException e) {
}
in.close();
return hash;
}
public IndexItem[] toIndex() throws IOException, ClassNotFoundException {
java.util.LinkedList ll = new java.util.LinkedList();
IntegrityLog.LogInputStream logStream =
transactionLog.getInputStream();
ObjectInputStream in = new MyObjectInputStream(logStream);
try {
while (true) {
IndexItem i = new IndexItem();
i.recordOffset = logStream.getFilePointer();
i.record = in.readObject();
ll.addLast(i);
}
} catch (java.io.EOFException e) {
}
in.close();
IndexItem rc[] = new IndexItem[ll.size()];
return (IndexItem[]) ll.toArray(rc);
}
public java.util.TreeSet toTreeSet() throws IOException,
ClassNotFoundException {
java.util.TreeSet treeSet = new java.util.TreeSet();
ObjectInputStream in = new MyObjectInputStream(new
BufferedInputStream(transactionLog.getInputStream()));
try {
while (true) {
Object o = in.readObject();
treeSet.add(o);
}
} catch (java.io.EOFException e) {
}
in.close();
return treeSet;
}
public java.util.Vector toVector() throws IOException, ClassNotFoundException {
java.util.Vector vector = new java.util.Vector();
ObjectInputStream in = new MyObjectInputStream(new
BufferedInputStream(transactionLog.getInputStream()));
try {
while (true) {
Object o = in.readObject();
vector.add(o);
}
} catch (java.io.EOFException e) {
}
in.close();
return vector;
}
}
1.1 jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.pm.logged;
import javax.jms.JMSException;
import java.net.URL;
import java.util.HashMap;
import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.jbossmq.xml.XElement;
import org.jbossmq.server.JMSServer;
import org.jbossmq.server.JMSDestination;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyDestination;
import javax.naming.InitialContext;
import org.jbossmq.pm.TxManager;
import org.jboss.util.ServiceMBeanSupport;
import javax.management.*;
import org.jbossmq.ConnectionToken;
/**
* This class manages all persistence related services.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class PersistenceManager
extends ServiceMBeanSupport
implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean,
MBeanRegistration {
private String dataDirectory;
// Log file used to store commited transactions.
SpyTxLog spyTxLog;
// Maps SpyDestinations to SpyMessageLogs
HashMap messageLogs = new HashMap();
static class LogInfo {
SpyMessageLog log;
SpyDestination destination;
String queueId;
LogInfo(SpyMessageLog log, SpyDestination destination, String queueId)
{
this.log=log;
this.destination=destination;
this.queueId=queueId;
}
}
public Long createPersistentTx() throws javax.jms.JMSException {
return spyTxLog.createTx();
}
public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
spyTxLog.commitTx(txId);
}
public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
spyTxLog.rollbackTx(txId);
}
public void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
try {
URL logFile = new URL(dataDirURL,
dest.toString()+"-"+queueId+".dat");
SpyMessageLog log = new SpyMessageLog(logFile.getFile());
LogInfo info = new LogInfo(log, dest, queueId);
messageLogs.put(""+dest+"-"+queueId, info);
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void destroyQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
try {
URL logFile = new URL(dataDirURL,
dest.toString()+"-"+queueId+".dat");
java.io.File file = new java.io.File(logFile.getFile());
SpyMessageLog log =
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
if( log == null )
throw new JMSException("The persistence log was never
initialized");
log.close();
file.delete();
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void add(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
LogInfo logInfo;
synchronized (messageLogs) {
logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
logInfo.log.add(message, txId);
}
public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
LogInfo logInfo;
synchronized (messageLogs) {
logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
logInfo.log.remove(message, txId);
}
// The directory where persistence data should be stored
URL dataDirURL;
TxManager txManager;
/**
* Insert the method's description here.
* Creation date: (6/27/2001 12:53:12 AM)
* @return java.lang.String
*/
public java.lang.String getDataDirectory() {
return dataDirectory;
}
public String getName() {
return "JBossMQ-PersistenceManager";
}
/**
* getTxManager method comment.
*/
public org.jbossmq.pm.TxManager getTxManager() {
return txManager;
}
public void initService() throws Exception {
URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
dataDirURL = new URL(configFile, dataDirectory);
URL txLogFile = new URL(dataDirURL, "transactions.dat");
spyTxLog = new SpyTxLog(txLogFile.getFile());
//Get an InitialContext
JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
server.setPersistenceManager(this);
}
public void restore(org.jbossmq.server.JMSServer server) throws
javax.jms.JMSException {
TreeSet commitedTXs = spyTxLog.restore();
HashMap clone;
synchronized (messageLogs) {
clone = (HashMap) messageLogs.clone();
}
Iterator iter = clone.values().iterator();
while (iter.hasNext()) {
LogInfo logInfo = (LogInfo)iter.next();
JMSDestination q =
server.getJMSDestination(logInfo.destination);
SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
//TODO: make sure this lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
q.restoreMessage(rebuild[i], logInfo.queueId);
}
}
}
}
/**
* Insert the method's description here.
* Creation date: (6/27/2001 12:53:12 AM)
* @param newDataDirectory java.lang.String
*/
public void setDataDirectory(java.lang.String newDataDirectory) {
dataDirectory = newDataDirectory;
}
public void startService() throws Exception {
JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
restore(server);
}
}
1.1
jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManagerMBean.java
Index: PersistenceManagerMBean.java
===================================================================
package org.jbossmq.pm.logged;
/*
* jBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
/*
* jBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
/*
* jBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
/*
* jBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
/*
* jBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
/**
* <description>
* MBean interface for the JBossMQ JMX service.
*
* @see <related>
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public interface PersistenceManagerMBean
extends org.jboss.util.ServiceMBean
{
// Constants -----------------------------------------------------
public static final String OBJECT_NAME = ":service=JBossMQ";
// Public --------------------------------------------------------
// Public --------------------------------------------------------
// Public --------------------------------------------------------
// Public --------------------------------------------------------
public java.lang.String getDataDirectory();
public void setDataDirectory(java.lang.String newDataDirectory);
}
1.1 jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java
Index: SpyMessageLog.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.pm.logged;
import java.io.IOException;
import java.io.Serializable;
import javax.jms.JMSException;
import org.jbossmq.SpyMessage;
/**
* This is used to keep a log of SpyMessages arriving and leaving
* a queue. The log can be used reconstruct the queue in case of
* provider failure. Integrety is kept by the use of an ObjectIntegrityLog.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class SpyMessageLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private ObjectIntegrityLog transactionLog;
private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
/////////////////////////////////////////////////////////////////////
// Helper Inner Classes
/////////////////////////////////////////////////////////////////////
static class MessageAddedRecord implements Serializable {
long messageId;
boolean isTransacted;
long transactionId;
SpyMessage message;
private final static long serialVersionUID = 235726945332013954L;
}
static class MessageRemovedRecord implements Serializable {
boolean isTransacted;
long transactionId;
long messageId;
private final static long serialVersionUID = 235726945332013955L;
}
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public SpyMessageLog(String fileName) throws JMSException {
try {
transactionLog = new ObjectIntegrityLog(fileName);
} catch ( IOException e ) {
throwJMSException("Could not open the queue's tranaction log:
"+fileName,e);
}
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
synchronized public void close() throws JMSException {
try{
transactionLog.close();
} catch ( IOException e ) {
throwJMSException("Could not close the queue's tranaction
log.",e);
}
}
synchronized public void add( SpyMessage message, Long transactionId ) throws
JMSException {
try{
messageAddedRecord.message = message;
messageAddedRecord.messageId = message.messageId;
if( transactionId == null ) {
messageAddedRecord.isTransacted = false;
} else {
messageAddedRecord.isTransacted = true;
messageAddedRecord.transactionId =
transactionId.longValue();
}
transactionLog.add(messageAddedRecord);
transactionLog.commit();
} catch ( IOException e ) {
throwJMSException("Could not write to the tranaction log.",e);
}
}
synchronized public void remove( SpyMessage message, Long transactionId )
throws JMSException {
try{
messageRemovedRecord.messageId = message.messageId;
if( transactionId == null ) {
messageRemovedRecord.isTransacted = false;
} else {
messageRemovedRecord.isTransacted = true;
messageRemovedRecord.transactionId =
transactionId.longValue();
}
transactionLog.add(messageRemovedRecord);
transactionLog.commit();
} catch ( IOException e ) {
throwJMSException("Could not write to the queue's tranaction
log.",e);
}
}
synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws
JMSException {
java.util.HashMap messageIndex = new java.util.HashMap();
try {
ObjectIntegrityLog.IndexItem objects[] =
transactionLog.toIndex();
for( int i=0; i < objects.length; i++ ) {
Object o = objects[i].record;
if( o instanceof MessageAddedRecord ) {
MessageAddedRecord r = (MessageAddedRecord)o;
r.message.messageId = r.messageId;
if( r.isTransacted && !commited.contains(new
Long(r.transactionId)) ) {
// the TX this message was part of was
not
// commited... so drop this message
continue;
}
messageIndex.put( new Long(r.messageId),
objects[i]);
} else if( o instanceof MessageRemovedRecord ) {
MessageRemovedRecord r =
(MessageRemovedRecord)o;
if( r.isTransacted && !commited.contains(new
Long(r.transactionId)) ) {
// the TX this message was part of was
not
// commited... so drop this message
continue;
}
messageIndex.remove( new Long(r.messageId));
}
}
} catch ( Exception e ) {
throwJMSException("Could not rebuild the queue from the
queue's tranaction log.",e);
}
SpyMessage rc[] = new SpyMessage[messageIndex.size()];
java.util.Iterator iter = messageIndex.values().iterator();
for( int i=0; iter.hasNext(); i++ ) {
ObjectIntegrityLog.IndexItem item =
(ObjectIntegrityLog.IndexItem)iter.next();
rc[i] = ((MessageAddedRecord)item.record).message;
}
return rc;
}
private void throwJMSException(String message, Exception e) throws
JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
}
}
1.1 jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java
Index: SpyMessageLogTester.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.pm.logged;
import org.jbossmq.*;
/**
* This class was used to perform unit testing on the SpyMessageLog/SpyTxLog
*
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class SpyMessageLogTester {
/**
* Starts the application.
* @param args an array of command-line arguments
*/
public static void main(java.lang.String[] args) throws Exception {
SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat");
SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat");
try{
java.util.TreeSet commited = tm.restore();
SpyMessage[] queue = log.restore(commited);
System.out.println("Recovered :"+queue.length+" message from
the message log");
long maxMessageId=0;
for( int i=0; i < queue.length; i++ ) {
System.out.println(" #"+i+": "+queue[i]);
maxMessageId = Math.max(maxMessageId,
queue[i].messageId );
}
Long tx1 = tm.createTx();
long first = ++maxMessageId;
add(log, first,tx1);
long second = ++maxMessageId;
add(log, second, tx1);
remove(log, first, tx1);
System.out.println("Commiting");
tm.commitTx(tx1);
Long tx2 = tm.createTx();
add(log, first,tx2);
System.out.println("Rolling back");
tm.rollbackTx(tx2);
add(log, second+1, null);
System.exit(0);
} finally {
log.close();
}
}
public static void add(SpyMessageLog log, long messageId, Long txid) throws
Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
m.setText("Hello World #"+m.messageId);
System.out.println("Adding message: "+m+",tx="+txid);
log.add(m,txid);
}
public static void remove(SpyMessageLog log, long messageId, Long txid) throws
Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
m.setText("Hello World #"+m.messageId);
System.out.println("Removing message: "+m+", tx="+txid);
log.remove(m,txid);
}
}
1.1 jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java
Index: SpyTxLog.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.pm.logged;
import java.io.Serializable;
import java.io.IOException;
import javax.jms.JMSException;
/**
* This is used to keep a log of commited transactions.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class SpyTxLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private ObjectIntegrityLog transactionLog;
private long nextTransactionId = Long.MIN_VALUE;
/////////////////////////////////////////////////////////////////////
// Constructors
/////////////////////////////////////////////////////////////////////
public SpyTxLog(String fileName) throws JMSException {
try {
transactionLog = new ObjectIntegrityLog(fileName);
} catch (IOException e) {
throwJMSException("Could not open the queue's tranaction log:
" + fileName, e);
}
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
synchronized public void close() throws JMSException {
try{
transactionLog.close();
} catch ( IOException e ) {
throwJMSException("Could not close the queue's tranaction
log.",e);
}
}
synchronized public void commitTx(Long id) throws JMSException {
try {
transactionLog.add(id);
transactionLog.commit();
} catch ( IOException e ) {
throwJMSException("Could not create a new transaction.",e);
}
}
synchronized public Long createTx() throws JMSException {
return new Long(nextTransactionId++);
}
synchronized public java.util.TreeSet restore() throws JMSException {
java.util.TreeSet items=null;
try {
items = transactionLog.toTreeSet();
} catch ( Exception e ) {
throwJMSException("Could not restore the transaction log.",e);
}
long maxId = Long.MIN_VALUE;
java.util.Iterator iter = items.iterator();
while( iter.hasNext() ) {
Long l = (Long)iter.next();
if( l.longValue() > maxId )
maxId = l.longValue();
}
nextTransactionId = maxId+1;
return items;
}
synchronized public void rollbackTx(Long txId) throws JMSException {
}
/////////////////////////////////////////////////////////////////////
// Private Methods
/////////////////////////////////////////////////////////////////////
private void throwJMSException(String message, Exception e) throws
JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development