User: pkendall
Date: 01/08/08 18:18:28
Modified: src/main/org/jbossmq/pm/file MessageLog.java
PersistenceManager.java
Log:
Major updates (especially to topics).
Speed improvements.
Make JVM IL work (by using a singleton JMSServer).
Message Listeners re-implemented using client-side thread.
Revision Changes Path
1.2 +151 -48 jbossmq/src/main/org/jbossmq/pm/file/MessageLog.java
Index: MessageLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/file/MessageLog.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MessageLog.java 2001/07/11 02:52:16 1.1
+++ MessageLog.java 2001/08/09 01:18:28 1.2
@@ -16,18 +16,29 @@
import javax.jms.JMSException;
-import org.jbossmq.SpyMessage;
+import org.jbossmq.*;
/**
* This is used to keep SpyMessages on the disk and is used reconstruct the
* queue in case of provider failure.
*
* @author: Paul Kendall ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class MessageLog {
/////////////////////////////////////////////////////////////////////
+ // Constants
+ /////////////////////////////////////////////////////////////////////
+ protected static final byte OBJECT_MESS = 3;
+ protected static final byte BYTES_MESS = 4;
+ protected static final byte MAP_MESS = 5;
+ protected static final byte TEXT_MESS = 6;
+ protected static final byte STREAM_MESS = 7;
+ protected static final byte ENCAP_MESS = 8;
+ protected static final byte SPY_MESS = 9;
+
+ /////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private File queueName;
@@ -36,10 +47,97 @@
// Constructor
/////////////////////////////////////////////////////////////////////
public MessageLog(String fileName) throws JMSException {
- queueName = new File(fileName);
- queueName.mkdir();
+ queueName = new File(fileName);
+ queueName.mkdir();
}
+ /////////////////////////////////////////////////////////////////////
+ // Utility Methods
+ /////////////////////////////////////////////////////////////////////
+ protected void delete(File file) throws IOException{
+ // I know this looks silly! But sometimes (but not often) M$ systems
fail
+ // on the first delete
+ if(!file.delete()) {
+ Thread.yield();
+ if( file.exists() ) {
+ if(!file.delete())
+ System.out.println("Failed to delete file: "+file.getAbsolutePath());
+ } else {
+ System.out.println("File was deleted, but delete() failed for:
"+file.getAbsolutePath());
+ }
+ }
+ }
+
+ protected void rename(File from, File to) throws IOException{
+ // I know this looks silly! But sometimes (but not often) M$ systems
fail
+ // on the first rename (as above)
+ if(!from.renameTo(to)) {
+ Thread.yield();
+ if( from.exists() ) {
+ if(!from.renameTo(to))
+ System.out.println("Rename of file "+from.getAbsolutePath()+" to
"+to.getAbsolutePath()+" failed.");
+ } else {
+ System.out.println("Rename of file "+from.getAbsolutePath()+" to
"+to.getAbsolutePath()+" failed but from no longer exists?");
+ }
+ }
+ }
+
+ protected void writeMessageToFile(SpyMessage message,File file) throws
IOException{
+ ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(file));
+ out.writeLong(message.messageId);
+ if(message instanceof SpyEncapsulatedMessage){
+ out.writeByte(ENCAP_MESS);
+ }else if(message instanceof SpyObjectMessage){
+ out.writeByte(OBJECT_MESS);
+ }else if(message instanceof SpyBytesMessage){
+ out.writeByte(BYTES_MESS);
+ }else if(message instanceof SpyMapMessage){
+ out.writeByte(MAP_MESS);
+ }else if(message instanceof SpyTextMessage){
+ out.writeByte(TEXT_MESS);
+ }else if(message instanceof SpyStreamMessage){
+ out.writeByte(STREAM_MESS);
+ }else{
+ out.writeByte(SPY_MESS);
+ }
+ message.writeExternal(out);
+ out.flush();
+ out.close();
+ }
+
+ protected void restoreMessageFromFile(java.util.TreeMap store, File file) throws
Exception{
+ ObjectInputStream in = new ObjectInputStream(new FileInputStream(file));
+ long msgId = in.readLong();
+ SpyMessage message = null;
+ byte type = in.readByte();
+ switch(type){
+ case OBJECT_MESS:
+ message = new SpyObjectMessage();
+ break;
+ case BYTES_MESS:
+ message = new SpyBytesMessage();
+ break;
+ case MAP_MESS:
+ message = new SpyMapMessage();
+ break;
+ case STREAM_MESS:
+ message = new SpyStreamMessage();
+ break;
+ case TEXT_MESS:
+ message = new SpyTextMessage();
+ break;
+ case ENCAP_MESS:
+ message = new SpyEncapsulatedMessage();
+ break;
+ default:
+ message = new SpyMessage();
+ }
+ message.readExternal(in);
+ in.close();
+ message.messageId = msgId;
+ message.persistData = file;
+ store.put(new Long(msgId), message);
+ }
/////////////////////////////////////////////////////////////////////
// Public Methods
@@ -47,63 +145,68 @@
public void close() throws JMSException {
}
- public void add( SpyMessage message, Long transactionId ) throws JMSException {
+ public void add( SpyMessage message, org.jbossmq.pm.Tx transactionId ) throws
JMSException {
try{
- FileOutputStream file = new FileOutputStream(new File(queueName,
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg"));
- ObjectOutputStream out = new ObjectOutputStream(file);
- out.writeObject(transactionId);
- out.writeLong(message.messageId);
- out.writeObject(message);
- out.flush();
- out.close();
+ File f;
+ if(transactionId == null)
+ f = new File(queueName, message.getJMSMessageID());
+ else
+ f = new File(queueName, message.getJMSMessageID()+"."+transactionId);
+ writeMessageToFile(message,f);
+ message.persistData = f;
} catch ( IOException e ) {
throwJMSException("Could not write to the tranaction log.",e);
}
}
- public void remove( SpyMessage message, Long transactionId ) throws
JMSException {
- File file = new File(queueName,
Long.toHexString(message.messageId)+"@"+message.getJMSMessageID()+".msg");
+ public void finishAdd( SpyMessage message, org.jbossmq.pm.Tx transactionId )
throws JMSException {
+ }
- // I know this looks silly! But sometimes (but not often) M$ systems
fail
- // on the first delete
- if(!file.delete()) {
- Thread.yield();
- if( file.exists() ) {
- if(!file.delete())
- System.out.println("Failed to delete file: "+file.getAbsolutePath());
- }
- else {
- System.out.println("File was deleted, but delete() failed for:
"+file.getAbsolutePath());
- }
- }
+ public void undoAdd( SpyMessage message, org.jbossmq.pm.Tx transactionId ) throws
JMSException {
+ try{
+ File file = (File)message.persistData;
+ delete(file);
+ } catch ( IOException e ) {
+ throwJMSException("Could not write to the tranaction log.",e);
+ }
+ }
+
+ public void remove( SpyMessage message, org.jbossmq.pm.Tx transactionId )
throws JMSException {
}
+
+ public void finishRemove( SpyMessage message, org.jbossmq.pm.Tx transactionId )
throws JMSException {
+ try{
+ File file = (File)message.persistData;
+ delete(file);
+ } catch ( IOException e ) {
+ throwJMSException("Could not write to the tranaction log.",e);
+ }
+ }
- public SpyMessage[] restore(java.util.TreeSet comittingTXs) throws
JMSException {
+ public void undoRemove( SpyMessage message, org.jbossmq.pm.Tx transactionId )
throws JMSException {
+ }
+ public SpyMessage[] restore(java.util.TreeSet rollBackTXs) throws JMSException
{
+ //use sorted map to get queue order right.
java.util.TreeMap messageIndex = new java.util.TreeMap();
try {
- File[] files = queueName.listFiles();
- for( int i=0 ; i<files.length ; i++ ) {
- ObjectInputStream in = new ObjectInputStream(new
FileInputStream(files[i]));
- Long txId = (Long)in.readObject();
- if( comittingTXs!=null && comittingTXs.contains(txId) ) {
- in.close();
- if(!files[i].delete()) {
- Thread.sleep(10);
- if(!files[i].delete()){
- System.out.println("Could not delete file:
"+files[i].getAbsolutePath());
- }
- }
- }
- else {
- long msgId = in.readLong();
- SpyMessage message = (SpyMessage)in.readObject();
- in.close();
- message.messageId = msgId;
- messageIndex.put(new Long(msgId), message);
- }
- }
+ File[] files = queueName.listFiles();
+ for( int i=0 ; i<files.length ; i++ ) {
+ String fileName = files[i].getName();
+ int extIndex = fileName.indexOf(".");
+ if(extIndex < 0){
+ //non transacted message so simply restore
+ restoreMessageFromFile(messageIndex,files[i]);
+ }else{
+ //test if message from a transaction that is being rolled back.
+ Long tx = new Long(Long.parseLong(fileName.substring(extIndex+1)));
+ if(rollBackTXs.contains(tx))
+ delete(files[i]);
+ else
+ restoreMessageFromFile(messageIndex,files[i]);
+ }
+ }
} catch ( Exception e ) {
throwJMSException("Could not rebuild the queue from the
queue's tranaction log.",e);
}
@@ -116,7 +219,7 @@
}
private void throwJMSException(String message, Exception e) throws
JMSException {
- JMSException newE = new JMSException(message);
+ JMSException newE = new SpyJMSException(message);
newE.setLinkedException(e);
throw newE;
}
1.3 +343 -212 jbossmq/src/main/org/jbossmq/pm/file/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/file/PersistenceManager.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PersistenceManager.java 2001/07/28 00:33:38 1.2
+++ PersistenceManager.java 2001/08/09 01:18:28 1.3
@@ -13,20 +13,18 @@
import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
+import java.io.File;
+import java.io.IOException;
-import org.jbossmq.xml.XElement;
import org.jbossmq.server.JMSServer;
import org.jbossmq.server.JMSDestination;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyDestination;
+import org.jbossmq.SpyJMSException;
-
-import javax.naming.InitialContext;
import org.jbossmq.pm.TxManager;
import org.jboss.util.ServiceMBeanSupport;
import javax.management.*;
-import java.io.Serializable;
-import org.jbossmq.ConnectionToken;
/**
* This class manages all persistence related services for file based
@@ -34,151 +32,182 @@
*
* @author Paul Kendall ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
-public class PersistenceManager extends ServiceMBeanSupport implements
org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, MBeanRegistration,
Serializable {
-
+public class PersistenceManager extends ServiceMBeanSupport
+ implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean,
MBeanRegistration {
-
- transient private String dataDirectory;
- // Log file used to store commited transactions.
- transient TxLog txLog;
+ // The directory where persistence data should be stored
+ String dataDirectory;
+ URL dataDirURL;
+ File dataDirFile;
+ //tx manager
+ org.jbossmq.pm.TxManager txManager;
// Maps SpyDestinations to SpyMessageLogs
- transient HashMap messageLogs= new HashMap();
+ HashMap messageLogs = new HashMap();
// Maps (Long)txIds to LinkedList of AddFile tasks
- transient HashMap transactedTasks= new HashMap();
+ HashMap transactedTasks = new HashMap();
- static class LogInfo {
+ class TxInfo {
+ File txf;
+ java.io.RandomAccessFile raf;
+ LinkedList tasks = new LinkedList();
+
+ TxInfo() throws JMSException{
+ }
+
+ void setFile(File f) throws JMSException{
+ txf = f;
+ try{
+ raf = new java.io.RandomAccessFile(txf,"rw");
+ }catch(IOException e){
+ JMSException jmse = new SpyJMSException("IO Error create raf for txinfo.");
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ }
+ }
+
+ protected static final int MAX_POOL_SIZE = 50;
+ protected java.util.ArrayList txPool = new java.util.ArrayList();
+
+ protected TxInfo getTxInfo(File f)throws JMSException{
+ TxInfo info;
+ synchronized(txPool){
+ if(txPool.isEmpty()){
+ info = new TxInfo();
+ }else{
+ info = (TxInfo)txPool.remove(txPool.size()-1);
+ }
+ }
+ info.setFile(f);
+ return info;
+ }
+
+ protected void releaseTxInfo(TxInfo info){
+ synchronized(txPool){
+ if(txPool.size() < MAX_POOL_SIZE){
+ info.tasks.clear();
+ txPool.add(info);
+ }
+ }
+ }
+
+ class LogInfo {
MessageLog log;
SpyDestination destination;
LogInfo(MessageLog log, SpyDestination destination) {
- this.log= log;
- this.destination= destination;
- }
- }
-
-
-
-
-
- public Long createPersistentTx() throws javax.jms.JMSException {
- Long txId = txLog.createTx();
- synchronized (transactedTasks) {
- transactedTasks.put(txId, new LinkedList());
- }
- return txId;
- }
-
- public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
-
- LinkedList transacted;
- synchronized (transactedTasks) {
- transacted = (LinkedList)transactedTasks.remove(txId);
- }
- synchronized(transacted){
- Iterator iter = transacted.iterator();
- while( iter.hasNext() ) {
- Transaction task = (Transaction)iter.next();
- task.commit();
- }
- }
-
- txLog.commitTx(txId);
- }
-
- public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
-
- LinkedList transacted;
- synchronized(transactedTasks){
- transacted = (LinkedList)transactedTasks.remove(txId);
- }
- synchronized(transacted){
- Iterator iter = transacted.iterator();
- while( iter.hasNext() ) {
- Transaction task = (Transaction)iter.next();
- task.rollback();
- }
- }
-
- txLog.rollbackTx(txId);
- }
-
-
-
-
-
-
-
-
-
- class Transaction {
- private LogInfo logInfo;
- private SpyMessage message;
- private Long txId;
- private boolean add;
- public Transaction(boolean add, LogInfo logInfo, SpyMessage message,
Long txId) {
- this.add= add;
- this.logInfo= logInfo;
- this.message= message;
- this.txId= txId;
+ this.log=log;
+ this.destination=destination;
}
- public void commit() throws JMSException {
- if (!add)
- logInfo.log.remove(message, txId);
- }
- public void rollback() throws JMSException {
- if (add)
- logInfo.log.remove(message, txId);
- }
}
- // The directory where persistence data should be stored
- transient URL dataDirURL;
- transient TxManager txManager;
-
/**
* PersistenceManager constructor.
*/
public PersistenceManager() throws javax.jms.JMSException {
txManager = new TxManager( this );
}
-
-/**
- * Insert the method's description here.
- * Creation date: (6/27/2001 12:53:12 AM)
- * @return java.lang.String
- */
-public java.lang.String getDataDirectory() {
- return dataDirectory;
-}
-
- public String getName() {
- return "JBossMQ-PersistenceManager";
- }
-
-/**
- * getTxManager method comment.
- */
-public org.jbossmq.pm.TxManager getTxManager() {
- return txManager;
-}
- public void initService() throws java.lang.Exception {
+ public String getName() {
+ return "JBossMQ-PersistenceManager";
+ }
+
+ public void setDataDirectory(java.lang.String newDataDirectory) {
+ dataDirectory = newDataDirectory;
+ }
+
+ public java.lang.String getDataDirectory() {
+ return dataDirectory;
+ }
+
+ public org.jbossmq.pm.TxManager getTxManager() {
+ return txManager;
+ }
+ public void initService() throws Exception {
URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
-
dataDirURL = new URL(configFile, dataDirectory);
- URL txLogFile = new URL(dataDirURL, "transactions.dat");
- txLog = new TxLog(txLogFile.getFile());
-
- JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
+ dataDirFile = new File(dataDirURL.getFile());
+ JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
server.setPersistenceManager(this);
}
+ public void startService() throws Exception {
+ JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
+ restore(server);
+ }
+
+ protected boolean testRollBackTx(Long tx, java.util.ArrayList removingMessages)
throws IOException{
+ //checks to see if this tx was in the middle of committing.
+ //If it was finish commit and return false else return true.
+ HashMap clone;
+ synchronized (messageLogs) {
+ clone = (HashMap) messageLogs.clone();
+ }
+
+ java.util.ArrayList files = new java.util.ArrayList();
+ boolean foundAll = true;
+ for(int i=0;i<removingMessages.size();i++){
+ String fileName = removingMessages.get(i)+"."+tx;
+ boolean found = false;
+ for(Iterator it = clone.keySet().iterator();!found && it.hasNext();){
+ String dirName = (String)it.next();
+ File dir = new File(dataDirFile,dirName);
+ File [] messageFiles = dir.listFiles();
+ for(int j=0;j<messageFiles.length;++j){
+ if(messageFiles[j].getName().equals(fileName)){
+ found = true;
+ files.add(messageFiles[j]);
+ break;
+ }
+ }
+ }
+ if(!found){
+ foundAll = false;
+ }
+ }
+ if(!foundAll){
+ //tx being committed so need to finish it by deleting files.
+ for(int i=0;i<files.size();++i){
+ File f = (File)files.get(i);
+ if(!f.delete()){
+ Thread.yield();
+ //try again
+ if(!f.delete()){
+ throw new IOException("Could not delete file "+f.getAbsolutePath());
+ }
+ }
+ }
+ return false;
+ }
+ return true;
+ }
+
public void restore(JMSServer server) throws javax.jms.JMSException {
+ //reconstruct TXs
+ TreeSet txs = new TreeSet();
+ File [] transactFiles = dataDirFile.listFiles();
+ for(int i=0;i<transactFiles.length;i++){
+ try{
+ Long tx = new Long(Long.parseLong(transactFiles[i].getName()));
+ java.util.ArrayList removingMessages = readTxFile(transactFiles[i]);
+ if(testRollBackTx(tx,removingMessages))
+ txs.add(tx);
+ }catch(NumberFormatException e){
+ System.out.println("Ignoring invalid transaction record file
"+transactFiles[i].getAbsolutePath());
+ transactFiles[i] = null;
+ }catch(IOException e){
+ JMSException jmse = new SpyJMSException("IO Error when restoring.");
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ }
- TreeSet committingTXs = txLog.restore();
+ if(!txs.isEmpty())
+ this.tidcounter = ((Long)txs.last()).longValue()+1;
+
HashMap clone;
synchronized (messageLogs) {
clone = (HashMap) messageLogs.clone();
@@ -186,85 +215,55 @@
Iterator iter = clone.values().iterator();
while (iter.hasNext()) {
-
LogInfo logInfo = (LogInfo)iter.next();
-
JMSDestination q =
server.getJMSDestination(logInfo.destination);
-
- SpyMessage rebuild[] = logInfo.log.restore(committingTXs);
-
+ SpyMessage rebuild[] = logInfo.log.restore(txs);
//TODO: make sure this lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
+ if(logInfo.destination instanceof org.jbossmq.SpyTopic)
+ rebuild[i].durableSubscriberID =
((org.jbossmq.SpyTopic)logInfo.destination).getDurableSubscriptionID();
q.restoreMessage(rebuild[i]);
}
}
}
+ //all txs now committed or rolled back so delete tx files
+ for(int i=0;i<transactFiles.length;i++){
+ if(transactFiles[i] != null)
+ deleteTxFile(transactFiles[i]);
+ }
}
-
-/**
- * Insert the method's description here.
- * Creation date: (6/27/2001 12:53:12 AM)
- * @param newDataDirectory java.lang.String
- */
-public void setDataDirectory(java.lang.String newDataDirectory) {
- dataDirectory = newDataDirectory;
-}
-
- public void startService() throws Exception {
-
- JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
- restore(server);
-
- }
-
- public void add(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
-
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination());
- }
- if (logInfo == null) {
- category.debug("Destination was not initialized : "+
message.getJMSDestination());
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+ public void initQueue( SpyDestination dest ) throws javax.jms.JMSException {
+ try {
+ URL logDir = new URL(dataDirURL, dest.toString());
+ MessageLog log = new MessageLog(logDir.getFile());
+ LogInfo info = new LogInfo(log, dest);
+ synchronized(messageLogs){
+ messageLogs.put(dest.toString(), info);
+ }
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
+ newE.setLinkedException(e);
+ throw newE;
}
-
- logInfo.log.add(message, txId);
-
- if( txId != null ) {
- LinkedList tasks;
- synchronized( transactedTasks ) {
- tasks = (LinkedList)transactedTasks.get(txId);
- }
- if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active 5.");
- synchronized(tasks){
- tasks.addLast(new Transaction(true, logInfo, message, txId));
- }
- }
-
}
-
- public void destroyQueue( SpyDestination dest) throws javax.jms.JMSException {
+ public void destroyQueue( SpyDestination dest ) throws javax.jms.JMSException {
try {
-
URL logDir = new URL(dataDirURL, dest.toString());
java.io.File file = new java.io.File(logDir.getFile());
-
- LogInfo logInfo;
- synchronized(messageLogs){
- logInfo = (LogInfo)messageLogs.remove(""+dest);
- }
+ LogInfo logInfo;
+ synchronized(messageLogs){
+ logInfo = (LogInfo)messageLogs.remove(dest.toString());
+ }
if( logInfo == null )
throw new JMSException("The persistence log was never
initialized");
-
logInfo.log.close();
file.delete();
-
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
@@ -272,57 +271,189 @@
newE.setLinkedException(e);
throw newE;
}
-
}
-
- public void initQueue(SpyDestination dest) throws javax.jms.JMSException {
- try {
+ protected long tidcounter = Long.MIN_VALUE;
- URL logDir= new URL(dataDirURL, dest.toString());
- MessageLog log= new MessageLog(logDir.getFile());
+ public org.jbossmq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
+ org.jbossmq.pm.Tx txId = null;
+ synchronized (transactedTasks) {
+ txId = new org.jbossmq.pm.Tx(tidcounter++);
+ transactedTasks.put(txId, getTxInfo(createTxFile(txId)));
+ }
+ return txId;
+ }
- LogInfo info= new LogInfo(log, dest);
+ protected File createTxFile(org.jbossmq.pm.Tx txId) throws javax.jms.JMSException
{
+ try{
+ File file = new File(dataDirFile,txId.toString());
+ if(!file.createNewFile())
+ throw new javax.jms.JMSException("Error creating tx file.");
+ return file;
+ }catch(IOException e){
+ JMSException newE = new SpyJMSException("Unable to create committing
transaction record.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+ }
+
+ protected void deleteTxFile(File file) throws javax.jms.JMSException {
+ if(!file.delete()){
+ Thread.yield();
+ if(file.exists() && !file.delete()){
+ throw new javax.jms.JMSException("Unable to delete committing transaction
record.");
+ }
+ }
+ }
+
+ protected java.util.ArrayList readTxFile(File file) throws javax.jms.JMSException
{
+ try{
+ java.util.ArrayList result = new java.util.ArrayList();
+ java.io.RandomAccessFile raf = new java.io.RandomAccessFile(file,"r");
+ try{
+ while(true){
+ result.add(raf.readUTF());
+ }
+ }catch(java.io.EOFException e){
+ }
+ raf.close();
+ return result;
+ }catch(IOException e){
+ JMSException newE = new SpyJMSException("Unable to read committing
transaction record.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+ }
- category.debug("Initializing persistence for destination: "+ dest);
- synchronized (messageLogs) {
- messageLogs.put("" + dest, info);
- }
+ public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
+ TxInfo info;
+ synchronized (transactedTasks) {
+ info = (TxInfo)transactedTasks.remove(txId);
+ }
+ //ensure record of tx exists
+ try{
+ info.raf.close();
+ }catch(IOException e){
+ JMSException jmse = new SpyJMSException("IO Error when closing raf for tx.");
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ synchronized(info.tasks){
+ Iterator iter = info.tasks.iterator();
+ while( iter.hasNext() ) {
+ Transaction task = (Transaction)iter.next();
+ task.commit();
+ }
+ }
+ deleteTxFile(info.txf);
+ releaseTxInfo(info);
+ }
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE= new javax.jms.JMSException("Invalid
configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
+ public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
+ TxInfo info;
+ synchronized (transactedTasks) {
+ info = (TxInfo)transactedTasks.remove(txId);
+ }
+ //ensure record of tx exists
+ try{
+ info.raf.close();
+ }catch(IOException e){
+ JMSException jmse = new SpyJMSException("IO Error when closing raf for tx.");
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ synchronized(info.tasks){
+ Iterator iter = info.tasks.iterator();
+ while( iter.hasNext() ) {
+ Transaction task = (Transaction)iter.next();
+ task.rollback();
+ }
+ }
+ deleteTxFile(info.txf);
+ releaseTxInfo(info);
+ }
+ public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
+ LogInfo logInfo;
+ synchronized (messageLogs) {
+ logInfo = (LogInfo)
messageLogs.get(message.getJMSDestination().toString());
+ }
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+ logInfo.log.add(message, txId);
+ if( txId == null){
+ logInfo.log.finishAdd(message, txId);
+ }else {
+ TxInfo info;
+ synchronized( transactedTasks ) {
+ info = (TxInfo)transactedTasks.get(txId);
+ }
+ if( info == null )
+ throw new javax.jms.JMSException("Transaction is not active 5.");
+ synchronized(info.tasks){
+ info.tasks.addLast(new Transaction(true, logInfo, message, txId));
+ }
+ }
}
- public void remove(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+ public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId)
throws javax.jms.JMSException {
LogInfo logInfo;
synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination());
+ logInfo = (LogInfo)
messageLogs.get(message.getJMSDestination().toString());
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
- if( txId == null )
- logInfo.log.remove(message, txId);
- else {
- LinkedList tasks;
- synchronized (transactedTasks) {
- tasks = (LinkedList)transactedTasks.get(txId);
- }
- if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active 6.");
- synchronized(tasks){
- tasks.addLast(new Transaction(false, logInfo, message, txId));
- }
- }
+ logInfo.log.remove(message, txId);
+ if( txId == null )
+ logInfo.log.finishRemove(message, txId);
+ else {
+ TxInfo info;
+ synchronized( transactedTasks ) {
+ info = (TxInfo)transactedTasks.get(txId);
+ }
+ if( info == null )
+ throw new javax.jms.JMSException("Transaction is not active 6.");
+ try{
+ info.raf.writeUTF(message.getJMSMessageID());
+ }catch(IOException e){
+ JMSException jmse = new SpyJMSException("IO Error when recording remove in
txs raf.");
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ synchronized(info.tasks){
+ info.tasks.addLast(new Transaction(false, logInfo, message, txId));
+ }
+ }
}
+
+ class Transaction {
+ private LogInfo logInfo;
+ private SpyMessage message;
+ private org.jbossmq.pm.Tx txId;
+ private boolean add;
+ public Transaction(boolean add, LogInfo logInfo, SpyMessage message,
org.jbossmq.pm.Tx txId) {
+ this.add = add;
+ this.logInfo = logInfo;
+ this.message = message;
+ this.txId = txId;
+ }
+ public void commit() throws JMSException {
+ if(add)
+ logInfo.log.finishAdd(message, txId);
+ else
+ logInfo.log.finishRemove(message, txId);
+ }
+ public void rollback() throws JMSException {
+ if(add)
+ logInfo.log.undoAdd(message, txId);
+ else
+ logInfo.log.undoRemove(message, txId);
+ }
+ }
+
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development