User: pkendall
Date: 01/05/15 00:18:01
Added: src/main/org/jbossmq/filepersistence MessageLog.java
PersistenceManager.java TxLog.java
Log:
New file based persistence package. Each persistent message is a single file.
Revision Changes Path
1.1 jbossmq/src/main/org/jbossmq/filepersistence/MessageLog.java
Index: MessageLog.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.filepersistence;
import java.io.IOException;
import java.io.Serializable;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.FileInputStream;
import java.io.ObjectInputStream;
import java.io.File;
import javax.jms.JMSException;
import org.jbossmq.SpyMessage;
/**
* This is used to keep SpyMessages on the disk and is used reconstruct the
* queue in case of provider failure.
*
* @author: Paul Kendall ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class MessageLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private File queueName;
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public MessageLog(String fileName) throws JMSException {
queueName = new File(fileName);
queueName.mkdir();
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
public void close() throws JMSException {
}
public void add( SpyMessage message, Long transactionId ) throws JMSException {
try{
FileOutputStream file = new FileOutputStream(new File(queueName,
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg"));
ObjectOutputStream out = new ObjectOutputStream(file);
out.writeObject(transactionId);
out.writeLong(message.messageId);
out.writeObject(message);
out.flush();
out.close();
} catch ( IOException e ) {
throwJMSException("Could not write to the tranaction log.",e);
}
}
public void remove( SpyMessage message, Long transactionId ) throws
JMSException {
File file = new File(queueName,
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg");
// I know this looks silly! But sometimes (but not often) M$ systems
fail
// on the first delete
if(!file.delete()) {
Thread.yield();
if( file.exists() ) {
if(!file.delete())
System.out.println("Failed to delete file: "+file.getAbsolutePath());
}
else {
System.out.println("File was deleted, but delete() failed for:
"+file.getAbsolutePath());
}
}
}
public SpyMessage[] restore(java.util.TreeSet comittingTXs) throws
JMSException {
java.util.TreeMap messageIndex = new java.util.TreeMap();
try {
File[] files = queueName.listFiles();
for( int i=0 ; i<files.length ; i++ ) {
ObjectInputStream in = new ObjectInputStream(new FileInputStream(files[i]));
Long txId = (Long)in.readObject();
if( comittingTXs!=null && comittingTXs.contains(txId) ) {
in.close();
if(!files[i].delete()) {
Thread.sleep(10);
if(!files[i].delete()){
System.out.println("Could not delete file:
"+files[i].getAbsolutePath());
}
}
}
else {
long msgId = in.readLong();
SpyMessage message = (SpyMessage)in.readObject();
in.close();
message.messageId = msgId;
messageIndex.put(new Long(msgId), message);
}
}
} catch ( Exception e ) {
throwJMSException("Could not rebuild the queue from the
queue's tranaction log.",e);
}
SpyMessage rc[] = new SpyMessage[messageIndex.size()];
java.util.Iterator iter = messageIndex.values().iterator();
for( int i=0; iter.hasNext(); i++ )
rc[i] = (SpyMessage)iter.next();
return rc;
}
private void throwJMSException(String message, Exception e) throws
JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
}
}
1.1
jbossmq/src/main/org/jbossmq/filepersistence/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.filepersistence;
import javax.jms.JMSException;
import java.net.URL;
import java.util.HashMap;
import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.jbossmq.xml.XElement;
import org.jbossmq.server.JMSServer;
import org.jbossmq.server.JMSDestination;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyDestination;
import org.jbossmq.SpyDistributedConnection;
/**
* This class manages all persistence related services for file based
* persistence.
*
* @author Paul Kendall ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class PersistenceManager extends org.jbossmq.server.PersistenceManager {
// The server this persistence manager is providing service for
JMSServer server;
// The configuration data for the manager.
XElement configElement;
// The directory where persistence data should be stored
URL dataDirectory;
// Log file used to store commited transactions.
TxLog txLog;
// Maps SpyDestinations to SpyMessageLogs
HashMap messageLogs = new HashMap();
// Maps (Long)txIds to LinkedList of AddFile tasks
HashMap transactedTasks = new HashMap();
static class LogInfo {
MessageLog log;
SpyDestination destination;
String queueId;
LogInfo(MessageLog log, SpyDestination destination, String queueId) {
this.log=log;
this.destination=destination;
this.queueId=queueId;
}
}
/**
* PersistenceManager constructor.
*/
public PersistenceManager(JMSServer server, XElement configElement) throws
javax.jms.JMSException {
try {
this.server = server;
this.configElement = configElement;
URL configFile =
getClass().getClassLoader().getResource("jbossmq.xml");
dataDirectory = new URL(configFile,
configElement.getField("DataDirectory"));
URL txLogFile = new URL(dataDirectory, "transactions.dat");
txLog = new TxLog(txLogFile.getFile());
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void restore() throws javax.jms.JMSException {
TreeSet committingTXs = txLog.restore();
HashMap clone;
synchronized (messageLogs) {
clone = (HashMap) messageLogs.clone();
}
Iterator iter = clone.values().iterator();
while (iter.hasNext()) {
LogInfo logInfo = (LogInfo)iter.next();
JMSDestination q =
server.getJMSDestination(logInfo.destination);
SpyMessage rebuild[] = logInfo.log.restore(committingTXs);
//TODO: make sure this lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
q.restoreMessage(rebuild[i], logInfo.queueId);
}
}
}
}
public Long createPersistentTx() throws javax.jms.JMSException {
Long txId = txLog.createTx();
synchronized (transactedTasks) {
transactedTasks.put(txId, new LinkedList());
}
return txId;
}
public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
LinkedList transacted;
synchronized (transactedTasks) {
transacted = (LinkedList)transactedTasks.remove(txId);
}
synchronized(transacted){
Iterator iter = transacted.iterator();
while( iter.hasNext() ) {
Transaction task = (Transaction)iter.next();
task.commit();
}
}
txLog.commitTx(txId);
}
public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
LinkedList transacted;
synchronized(transactedTasks){
transacted = (LinkedList)transactedTasks.remove(txId);
}
synchronized(transacted){
Iterator iter = transacted.iterator();
while( iter.hasNext() ) {
Transaction task = (Transaction)iter.next();
task.rollback();
}
}
txLog.rollbackTx(txId);
}
public void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
try {
URL logDir = new URL(dataDirectory,
dest.toString()+"-"+queueId);
MessageLog log = new MessageLog(logDir.getFile());
LogInfo info = new LogInfo(log, dest, queueId);
synchronized(messageLogs){
messageLogs.put(""+dest+"-"+queueId, info);
}
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void destroyQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
try {
URL logDir = new URL(dataDirectory,
dest.toString()+"-"+queueId);
java.io.File file = new java.io.File(logDir.getFile());
LogInfo logInfo;
synchronized(messageLogs){
logInfo = (LogInfo)messageLogs.remove(""+dest+"-"+queueId);
}
if( logInfo == null )
throw new JMSException("The persistence log was never
initialized");
logInfo.log.close();
file.delete();
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void add(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
LogInfo logInfo;
synchronized (messageLogs) {
logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
logInfo.log.add(message, txId);
if( txId != null ) {
LinkedList tasks;
synchronized( transactedTasks ) {
tasks = (LinkedList)transactedTasks.get(txId);
}
if( tasks == null )
throw new javax.jms.JMSException("Transaction is not active 5.");
synchronized(tasks){
tasks.addLast(new Transaction(true, logInfo, message, txId));
}
}
}
public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
LogInfo logInfo;
synchronized (messageLogs) {
logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
if( txId == null )
logInfo.log.remove(message, txId);
else {
LinkedList tasks;
synchronized (transactedTasks) {
tasks = (LinkedList)transactedTasks.get(txId);
}
if( tasks == null )
throw new javax.jms.JMSException("Transaction is not active 6.");
synchronized(tasks){
tasks.addLast(new Transaction(false, logInfo, message, txId));
}
}
}
class Transaction {
private LogInfo logInfo;
private SpyMessage message;
private Long txId;
private boolean add;
public Transaction(boolean add, LogInfo logInfo, SpyMessage message, Long txId) {
this.add = add;
this.logInfo = logInfo;
this.message = message;
this.txId = txId;
}
public void commit() throws JMSException {
if(!add)
logInfo.log.remove(message, txId);
}
public void rollback() throws JMSException {
if(add)
logInfo.log.remove(message, txId);
}
}
}
1.1 jbossmq/src/main/org/jbossmq/filepersistence/TxLog.java
Index: TxLog.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.filepersistence;
import java.io.Serializable;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.TreeSet;
import java.util.Iterator;
import javax.jms.JMSException;
/**
* This is used to keep a log of active transactions.
* It is used to rollback transactions when the system restarts.
*
* @author: Paul Kendall ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class TxLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private RandomAccessFile transactionLog;
private TreeSet transactions = new TreeSet();
private long nextTransactionId = Long.MIN_VALUE;
/////////////////////////////////////////////////////////////////////
// Constructors
/////////////////////////////////////////////////////////////////////
public TxLog(String fileName) throws JMSException {
try {
transactionLog = new RandomAccessFile(fileName, "rw");
} catch (IOException e) {
throwJMSException("Could not open tranaction log: " +
fileName, e);
}
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
synchronized public void close() throws JMSException {
try{
transactionLog.close();
} catch ( IOException e ) {
throwJMSException("Could not close tranaction log.",e);
}
}
synchronized public Long createTx() throws JMSException {
Long id = new Long(nextTransactionId++);
transactions.add(id);
try {
transactionLog.writeLong(nextTransactionId);
transactionLog.writeInt(transactions.size());
for(Iterator iter = transactions.iterator() ; iter.hasNext() ;) {
transactionLog.writeLong(((Long)iter.next()).longValue());
}
transactionLog.seek(0);
} catch ( IOException e ) {
throwJMSException("Could not write transaction log on
commit.",e);
}
return id;
}
synchronized public void commitTx(Long txId) throws JMSException {
try {
transactions.remove(txId);
transactionLog.writeLong(nextTransactionId);
transactionLog.writeInt(transactions.size());
for(Iterator iter = transactions.iterator() ; iter.hasNext() ;) {
transactionLog.writeLong(((Long)iter.next()).longValue());
}
transactionLog.seek(0);
} catch ( IOException e ) {
throwJMSException("Could not write transaction log on
commit.",e);
}
}
synchronized public void rollbackTx(Long txId) throws JMSException {
try {
transactions.remove(txId);
transactionLog.writeLong(nextTransactionId);
transactionLog.writeInt(transactions.size());
for(Iterator iter = transactions.iterator() ; iter.hasNext() ;) {
transactionLog.writeLong(((Long)iter.next()).longValue());
}
transactionLog.seek(0);
} catch ( IOException e ) {
throwJMSException("Could not write transaction log on
rollback.",e);
}
}
synchronized public java.util.TreeSet restore() throws JMSException {
java.util.TreeSet items=null;
try {
if( transactionLog.length() != 0 ) {
nextTransactionId = transactionLog.readLong();
int size = transactionLog.readInt();
if( size > 0 ) {
items = new java.util.TreeSet();
for(int i=0 ; i<size ; i++) {
long txId = transactionLog.readLong();
items.add(new Long(txId));
}
}
}
transactionLog.seek(0);
transactionLog.setLength(0);
} catch ( Exception e ) {
throwJMSException("Could not restore the transaction log.",e);
}
return items;
}
/////////////////////////////////////////////////////////////////////
// Private Methods
/////////////////////////////////////////////////////////////////////
private void throwJMSException(String message, Exception e) throws
JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development