User: pkendall Date: 01/07/31 13:17:52 Added: src/main/org/jbossmq/pm/rollinglogged IntegrityLog.java ObjectIntegrityLog.java PersistenceManager.java PersistenceManagerMBean.java SpyMessageLog.java SpyTxLog.java Log: new persistence mechanism Revision Changes Path 1.1 jbossmq/src/main/org/jbossmq/pm/rollinglogged/IntegrityLog.java Index: IntegrityLog.java =================================================================== /* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jbossmq.pm.rollinglogged; import java.io.RandomAccessFile; import java.io.OutputStream; import java.io.InputStream; import java.io.IOException; import java.io.File; /** * This class is used to create a log file which which will will garantee * it's integrety up to the last commit point. * * The InputStream returned by getInputStream() will read * data placed into the log with the OutputStream returned by * getOutputStream(). The EOF for the InputStream is the * last commited point of the OutputStream. * * @author: Hiram Chirino ([EMAIL PROTECTED]) * @version $Revision: 1.1 $ */ public class IntegrityLog { ///////////////////////////////////////////////////////////////////// // Attributes ///////////////////////////////////////////////////////////////////// private static final int HEADER_SIZE=16; // in bytes // Header related stuff private long firstRecordPos; private long nextRecordPos; private byte headerBytes[]=new byte[HEADER_SIZE]; private RandomAccessFile raf; private File f; private LogOutputStream outputStream; private LogInputStream inputStream; ///////////////////////////////////////////////////////////////////// // Helper Inner Classes ///////////////////////////////////////////////////////////////////// class LogInputStream extends InputStream { boolean closed = false; long inputPos = 0; public long getFilePointer() { return inputPos; } public void close() throws IOException { super.close(); closed = true; } public int read() throws IOException { inputPos = Math.max(inputPos, firstRecordPos); int rc = IntegrityLog.this.read(inputPos); if( rc >= 0 ) inputPos ++; return rc; } public int read(byte bytes[], int off, int len) throws IOException { inputPos = Math.max(inputPos, firstRecordPos); int rc = IntegrityLog.this.read(inputPos, bytes, off, len); if( rc >= 0 ) inputPos += rc; return rc; } } class LogOutputStream extends OutputStream { boolean closed = false; public long getFilePointer() { return nextRecordPos; } public void close() throws IOException { super.close(); closed = true; } public void write(int b) throws IOException { IntegrityLog.this.write( (byte)b ); } public void write(byte bytes[], int off, int len) throws IOException { IntegrityLog.this.write( bytes, off, len ); } } ///////////////////////////////////////////////////////////////////// // Constructor ///////////////////////////////////////////////////////////////////// public IntegrityLog(String fileName) throws IOException { f = new File(fileName); boolean exists = f.isFile(); raf = new RandomAccessFile(f, "rw"); if( exists ) { loadHeader(); } else { initHeader(); } } ///////////////////////////////////////////////////////////////////// // Public Methods ///////////////////////////////////////////////////////////////////// public LogInputStream getInputStream() { if ( inputStream==null || inputStream.closed ) { inputStream = new LogInputStream(); } return inputStream; } public LogOutputStream getOutputStream() throws IOException { if ( outputStream==null || outputStream.closed ) { outputStream = new LogOutputStream(); } return outputStream; } public void commit() throws IOException { headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF); headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF); headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF); headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF); headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF); headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF); headerBytes[6] = (byte)((firstRecordPos >>> 8) & 0xFF); headerBytes[7] = (byte)((firstRecordPos >>> 0) & 0xFF); headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF); headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF); headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF); headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF); headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF); headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF); headerBytes[14] =(byte)((nextRecordPos >>> 8) & 0xFF); headerBytes[15] =(byte)((nextRecordPos >>> 0) & 0xFF); raf.seek(0); raf.write(headerBytes); } public void rollback() throws IOException { loadHeader(); } public void delete() throws IOException { f.delete(); } public void close() throws IOException { raf.close(); raf = null; } ///////////////////////////////////////////////////////////////////// // Private Methods ///////////////////////////////////////////////////////////////////// private long getBytesLeft(long offset) { return nextRecordPos-offset; } private void initHeader() throws IOException { firstRecordPos = HEADER_SIZE; nextRecordPos = HEADER_SIZE; commit(); } private void loadHeader() throws IOException { raf.seek(0); firstRecordPos = raf.readLong(); nextRecordPos = raf.readLong(); } private int read(long offset) throws IOException { if( offset >= nextRecordPos ) return -1; if( raf.getFilePointer() != offset ) { raf.seek(offset); } int rc = raf.read(); return rc; } private int read(long offset, byte bytes[], int off, int len) throws IOException { if( offset >= nextRecordPos ) return -1; len = (int)Math.min(len, getBytesLeft(offset)); if( raf.getFilePointer() != offset ) { raf.seek(offset); } int rc = raf.read(bytes, off, len); return rc; } private void write(byte []record, int off, int len) throws IOException { if( raf.getFilePointer() != nextRecordPos ) { raf.seek(nextRecordPos); } raf.write(record, off, len); nextRecordPos+=len; } private void write(byte b) throws IOException { if( raf.getFilePointer() != nextRecordPos ) { raf.seek(nextRecordPos); } raf.write(b); nextRecordPos++; } } 1.1 jbossmq/src/main/org/jbossmq/pm/rollinglogged/ObjectIntegrityLog.java Index: ObjectIntegrityLog.java =================================================================== /* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jbossmq.pm.rollinglogged; import java.io.InputStream; import java.io.OutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.IOException; import java.io.Serializable; import java.io.BufferedInputStream; import javax.jms.JMSException; /** * This is used to keep a log of Serializable Objects with garanteed integrety. * * Every object add()ed to the log without an exception is garenteed * to be recovered by any of the to*() methods. The log file will not be * corrupted if the process dies in the middle of an add(). * * @author: Hiram Chirino ([EMAIL PROTECTED]) * @version $Revision: 1.1 $ */ public class ObjectIntegrityLog { ///////////////////////////////////////////////////////////////////// // Attributes ///////////////////////////////////////////////////////////////////// private IntegrityLog.LogOutputStream logOutputStream; private ObjectOutputStream out; private IntegrityLog transactionLog; static class IndexItem { long recordOffset; Object record; } ///////////////////////////////////////////////////////////////////// // Helper Inner classes ///////////////////////////////////////////////////////////////////// static class MyObjectOutputStream extends ObjectOutputStream { MyObjectOutputStream(OutputStream os) throws IOException { super(os); } /** * diable the writing of the stream header. */ protected void writeStreamHeader() { } } static class MyObjectInputStream extends ObjectInputStream { MyObjectInputStream(InputStream is) throws IOException { super(is); } /** * diable the reading of the stream header. */ protected void readStreamHeader() { } } ///////////////////////////////////////////////////////////////////// // Constructor ///////////////////////////////////////////////////////////////////// public ObjectIntegrityLog(String fileName) throws IOException { transactionLog = new IntegrityLog(fileName); logOutputStream = transactionLog.getOutputStream(); out = new MyObjectOutputStream(logOutputStream); } ///////////////////////////////////////////////////////////////////// // Public Methods ///////////////////////////////////////////////////////////////////// public void commit() throws IOException { transactionLog.commit(); } public void rollback() throws IOException { transactionLog.rollback(); } public void delete() throws IOException { transactionLog.delete(); } public void close() throws IOException { transactionLog.close(); } public IndexItem add(Object o) throws IOException { IndexItem item = new IndexItem(); item.record = o; item.recordOffset = logOutputStream.getFilePointer(); out.writeObject(o); out.reset(); out.flush(); return item; } public Object[] toArray() throws IOException, ClassNotFoundException { java.util.LinkedList ll = new java.util.LinkedList(); ObjectInputStream in = new MyObjectInputStream(new BufferedInputStream(transactionLog.getInputStream())); try { while (true) { Object o = in.readObject(); ll.addLast(o); } } catch (java.io.EOFException e) { } in.close(); Object rc[] = new Object[ll.size()]; return (Object[]) ll.toArray(rc); } public java.util.HashSet toHashSet() throws IOException, ClassNotFoundException { java.util.HashSet hash = new java.util.HashSet(); ObjectInputStream in = new MyObjectInputStream(new BufferedInputStream(transactionLog.getInputStream())); try { while (true) { Object o = in.readObject(); hash.add(o); } } catch (java.io.EOFException e) { } in.close(); return hash; } public IndexItem[] toIndex() throws IOException, ClassNotFoundException { java.util.LinkedList ll = new java.util.LinkedList(); IntegrityLog.LogInputStream logStream = transactionLog.getInputStream(); ObjectInputStream in = new MyObjectInputStream(logStream); try { while (true) { IndexItem i = new IndexItem(); i.recordOffset = logStream.getFilePointer(); i.record = in.readObject(); ll.addLast(i); } } catch (java.io.EOFException e) { } in.close(); IndexItem rc[] = new IndexItem[ll.size()]; return (IndexItem[]) ll.toArray(rc); } public java.util.TreeSet toTreeSet() throws IOException, ClassNotFoundException { java.util.TreeSet treeSet = new java.util.TreeSet(); ObjectInputStream in = new MyObjectInputStream(new BufferedInputStream(transactionLog.getInputStream())); try { while (true) { Object o = in.readObject(); treeSet.add(o); } } catch (java.io.EOFException e) { } in.close(); return treeSet; } public java.util.Vector toVector() throws IOException, ClassNotFoundException { java.util.Vector vector = new java.util.Vector(); ObjectInputStream in = new MyObjectInputStream(new BufferedInputStream(transactionLog.getInputStream())); try { while (true) { Object o = in.readObject(); vector.add(o); } } catch (java.io.EOFException e) { } in.close(); return vector; } } 1.1 jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManager.java Index: PersistenceManager.java =================================================================== package org.jbossmq.pm.rollinglogged; import javax.jms.JMSException; import java.net.URL; import java.util.HashMap; import java.util.TreeSet; import java.util.Iterator; import java.util.LinkedList; import java.util.HashSet; import org.jbossmq.xml.XElement; import org.jbossmq.server.JMSServer; import org.jbossmq.server.JMSDestination; import org.jbossmq.SpyMessage; import org.jbossmq.SpyDestination; import javax.naming.InitialContext; import org.jbossmq.pm.TxManager; import org.jboss.util.ServiceMBeanSupport; import javax.management.*; import org.jbossmq.ConnectionToken; /** * This class manages all persistence related services. * * @author David Maplesden * * @version $Revision: 1.1 $ */ public class PersistenceManager extends ServiceMBeanSupport implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, MBeanRegistration { public static final int ROLL_OVER_SIZE = 1000; public static final String TRANS_FILE_NAME = "transactions.dat"; public static final boolean DEBUG = false; /** * NewPersistenceManager constructor. */ public PersistenceManager() throws javax.jms.JMSException { txManager = new TxManager( this ); } private String dataDirectory; int numRollOvers = 0; HashMap queues = new HashMap(); // Log file used to store commited transactions. SpyTxLog currentTxLog; long nextTxId = Long.MIN_VALUE; // Maps txLogs to Maps of SpyDestinations to SpyMessageLogs HashMap messageLogs = new HashMap(); // Maps transactionIds to txInfos HashMap transToTxLogs = new HashMap(); static class LogInfo { SpyMessageLog log; SpyDestination destination; int liveMessages = 0; SpyTxLog txLog; LogInfo(SpyMessageLog log, SpyDestination destination, SpyTxLog txLog) { this.log=log; this.destination=destination; this.txLog = txLog; } } static class TxInfo { Long txId; LinkedList addMessages = new LinkedList(); LinkedList ackMessages = new LinkedList(); SpyTxLog log; TxInfo(Long txId,SpyTxLog log){ this.txId = txId; this.log = log; } } public Long createPersistentTx() throws javax.jms.JMSException { Long txId = null; SpyTxLog txLog = currentTxLog; synchronized(transToTxLogs){ txId = new Long(++nextTxId); transToTxLogs.put(txId,new TxInfo(txId,txLog)); } txLog.createTx(); return txId; } public void commitPersistentTx(Long txId) throws javax.jms.JMSException { TxInfo info = null; LinkedList messagesToDelete = null; synchronized(transToTxLogs){ info = (TxInfo)transToTxLogs.remove(txId); messagesToDelete = info.ackMessages; } deleteMessages(messagesToDelete); info.log.commitTx(txId); checkCleanup(info.log); } public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException { TxInfo info = null; LinkedList messagesToDelete = null; synchronized(transToTxLogs){ info = (TxInfo)transToTxLogs.remove(txId); messagesToDelete = info.addMessages; } deleteMessages(messagesToDelete); info.log.rollbackTx(txId); checkCleanup(info.log); } protected void deleteMessages(LinkedList messages) throws javax.jms.JMSException{ for(Iterator it = messages.iterator();it.hasNext();){ LogInfo info = ((LogInfo)((SpyMessage)it.next()).persistData); synchronized(info){ --info.liveMessages; } checkCleanup(info.txLog); } } protected int messageCounter = 0; protected void checkRollOver() throws JMSException{ synchronized(queues){ int max = queues.size(); if(max == 0) max = ROLL_OVER_SIZE; else max *= ROLL_OVER_SIZE; if(++messageCounter > max){ messageCounter = 0; rollOverLogs(); } } } protected void rollOverLogs() throws JMSException{ try{ HashMap logs = new HashMap(); ++numRollOvers; SpyTxLog newTxLog = new SpyTxLog(new URL(dataDirURL,TRANS_FILE_NAME+numRollOvers).getFile()); for(Iterator it = queues.values().iterator();it.hasNext();){ SpyDestination dest = (SpyDestination)it.next(); SpyMessageLog log = new SpyMessageLog(new URL(dataDirURL, dest.toString()+".dat"+numRollOvers).getFile()); LogInfo logInfo = new LogInfo(log, dest, newTxLog); logs.put(""+dest,logInfo); } SpyTxLog oldLog = currentTxLog; synchronized(messageLogs){ currentTxLog = newTxLog; messageLogs.put(newTxLog,logs); } checkCleanup(oldLog); }catch(java.net.MalformedURLException e){ JMSException jme = new JMSException("Error rolling over logs to new files."); jme.setLinkedException(e); throw jme; } } protected void checkCleanup(SpyTxLog txLog) throws JMSException{ if(txLog == currentTxLog) return; HashMap logs; synchronized(messageLogs){ logs = (HashMap) messageLogs.get(txLog); } synchronized(logs){ //if no live messages and no live transactions then cleanup for(Iterator it = logs.values().iterator();it.hasNext();){ LogInfo info = (LogInfo)it.next(); synchronized(info){ if(info.liveMessages != 0) return; } } } if(!txLog.completed()){ return; } if(DEBUG) System.out.println("Cleaning up"); //close and delete all logs, remove data from data structures. synchronized(messageLogs){ logs = (HashMap)messageLogs.remove(txLog); } if(logs == null) return; txLog.close(); txLog.delete(); for(Iterator it = logs.values().iterator();it.hasNext();){ LogInfo info = (LogInfo)it.next(); info.log.close(); info.log.delete(); } } public void initQueue( SpyDestination dest ) throws javax.jms.JMSException { String key = ""+dest; queues.put(key,dest); } public void destroyQueue( SpyDestination dest ) throws javax.jms.JMSException { try { String key = ""+dest; queues.remove(key); SpyMessageLog log = null; HashMap logs; synchronized(messageLogs){ logs = (HashMap)messageLogs.get(currentTxLog); } synchronized(logs){ log = (SpyMessageLog)logs.remove(key); } if( log == null ) throw new JMSException("The persistence log was never initialized"); log.close(); log.delete(); HashSet deleteLogs = new HashSet(); synchronized(messageLogs){ for(Iterator it = messageLogs.values().iterator();it.hasNext();){ logs = (HashMap) it.next(); synchronized(logs){ log = (SpyMessageLog)logs.remove(key); } if(log != null){ deleteLogs.add(log); } } } for(Iterator it=deleteLogs.iterator();it.hasNext();){ log = (SpyMessageLog)it.next(); log.close(); log.delete(); } } catch (javax.jms.JMSException e) { throw e; } catch (Exception e) { javax.jms.JMSException newE = new javax.jms.JMSException("Invalid configuration."); newE.setLinkedException(e); throw newE; } } public void add(org.jbossmq.SpyMessage message, Long txId) throws javax.jms.JMSException { LogInfo logInfo; SpyTxLog txLog = null; if(txId == null){ txLog = currentTxLog; }else{ synchronized(transToTxLogs){ txLog = ((TxInfo)transToTxLogs.get(txId)).log; } } HashMap logs; synchronized (messageLogs) { logs = (HashMap)messageLogs.get(txLog); } synchronized(logs){ logInfo = (LogInfo) logs.get(""+message.getJMSDestination()); } if (logInfo == null) throw new javax.jms.JMSException("Destination was not initalized with the PersistenceManager"); synchronized(logInfo){ logInfo.liveMessages++; message.persistData = logInfo; logInfo.log.add(message, txId); } if(txId != null){ synchronized(transToTxLogs){ TxInfo txInfo = (TxInfo)transToTxLogs.get(txId); txInfo.addMessages.add(message); } } checkRollOver(); } public void remove(org.jbossmq.SpyMessage message, Long txId) throws javax.jms.JMSException { LogInfo logInfo; SpyTxLog txLog = ((LogInfo)message.persistData).txLog; synchronized (messageLogs) { HashMap logs = (HashMap)messageLogs.get(txLog); logInfo = (LogInfo) logs.get(""+message.getJMSDestination()); } if (logInfo == null) throw new javax.jms.JMSException("Destination was not initalized with the PersistenceManager"); synchronized(logInfo.log){ logInfo.log.remove(message, txId); } if(txId != null){ synchronized(transToTxLogs){ TxInfo txInfo = (TxInfo)transToTxLogs.get(txId); txInfo.ackMessages.add(message); } } if(txId == null){ synchronized(logInfo){ --logInfo.liveMessages; } checkCleanup(txLog); } } // The directory where persistence data should be stored URL dataDirURL; TxManager txManager; /** * Insert the method's description here. * Creation date: (6/27/2001 12:53:12 AM) * @return java.lang.String */ public java.lang.String getDataDirectory() { return dataDirectory; } public String getName() { return "JBossMQ-PersistenceManager"; } /** * getTxManager method comment. */ public org.jbossmq.pm.TxManager getTxManager() { return txManager; } public void initService() throws Exception { if(DEBUG) System.out.println("Using new rolling logged persistence manager."); URL configFile = getClass().getClassLoader().getResource("jboss.jcml"); dataDirURL = new URL(configFile, dataDirectory); //Get an InitialContext JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new String[] {} ); server.setPersistenceManager(this); } public void restore(org.jbossmq.server.JMSServer server) throws javax.jms.JMSException { TreeSet commitedTxs = new TreeSet(); HashMap txLogs = new HashMap(); java.io.File dir = new java.io.File(dataDirURL.getFile()); java.io.File [] dataFiles = dir.listFiles(); for(int i=0;i<dataFiles.length;++i){ String name = dataFiles[i].getName(); if(name.startsWith(TRANS_FILE_NAME)){ int rollOver = name.charAt(name.length()-1) - '0'; numRollOvers = Math.max(numRollOvers,rollOver+1); SpyTxLog txLog = new SpyTxLog(dataFiles[i].getAbsolutePath()); txLog.restore(commitedTxs); txLogs.put(new Integer(rollOver),txLog); messageLogs.put(txLog, new HashMap()); } } if(!commitedTxs.isEmpty()) nextTxId = ((Long)commitedTxs.last()).longValue(); for(int i=0;i<dataFiles.length;++i){ String name = dataFiles[i].getName(); if(!name.startsWith(TRANS_FILE_NAME)){ int rollOver = name.charAt(name.length()-1) - '0'; String key = name.substring(0,name.length()-5); SpyMessageLog messageLog = new SpyMessageLog(dataFiles[i].getAbsolutePath()); SpyMessage [] messages = messageLog.restore(commitedTxs); SpyTxLog txLog = (SpyTxLog)txLogs.get(new Integer(rollOver)); SpyDestination dest = (SpyDestination)queues.get(key); JMSDestination q = server.getJMSDestination(dest); LogInfo info = new LogInfo(messageLog,dest,txLog); info.liveMessages = messages.length; HashMap logs = (HashMap)messageLogs.get(txLog); logs.put(key,info); //TODO: make sure this lock is good enough synchronized(q){ for(int j=0;j<messages.length;j++){ messages[j].persistData = info; q.restoreMessage(messages[j]); } } } } try{ URL txLogFile = new URL(dataDirURL, TRANS_FILE_NAME+numRollOvers); currentTxLog = new SpyTxLog(txLogFile.getFile()); messageLogs.put(currentTxLog, new HashMap()); for(Iterator it = queues.values().iterator();it.hasNext();){ SpyDestination dest = (SpyDestination)it.next(); String key = ""+dest; URL logFile = new URL(dataDirURL, dest.toString()+".dat"+numRollOvers); SpyMessageLog log = new SpyMessageLog(logFile.getFile()); synchronized(messageLogs){ LogInfo logInfo = new LogInfo(log, dest, currentTxLog); HashMap logs = (HashMap)messageLogs.get(currentTxLog); logs.put(key, logInfo); } } } catch (Exception e) { javax.jms.JMSException newE = new javax.jms.JMSException("Invalid configuration."); newE.setLinkedException(e); throw newE; } } /** * Insert the method's description here. * Creation date: (6/27/2001 12:53:12 AM) * @param newDataDirectory java.lang.String */ public void setDataDirectory(java.lang.String newDataDirectory) { dataDirectory = newDataDirectory; } public void startService() throws Exception { JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new String[] {} ); restore(server); } } 1.1 jbossmq/src/main/org/jbossmq/pm/rollinglogged/PersistenceManagerMBean.java Index: PersistenceManagerMBean.java =================================================================== package org.jbossmq.pm.rollinglogged; /* * jBoss, the OpenSource EJB server * * Distributable under LGPL license. * See terms of license at gnu.org. */ /** * <description> * MBean interface for the JBossMQ JMX service. * * @see <related> * @author Vincent Sheffer ([EMAIL PROTECTED]) * @version $Revision: 1.1 $ */ public interface PersistenceManagerMBean extends org.jboss.util.ServiceMBean { // Constants ----------------------------------------------------- public static final String OBJECT_NAME = ":service=JBossMQ"; // Public -------------------------------------------------------- // Public -------------------------------------------------------- // Public -------------------------------------------------------- // Public -------------------------------------------------------- public java.lang.String getDataDirectory(); public void setDataDirectory(java.lang.String newDataDirectory); } 1.1 jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyMessageLog.java Index: SpyMessageLog.java =================================================================== /* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jbossmq.pm.rollinglogged; import java.io.IOException; import java.io.Serializable; import javax.jms.JMSException; import org.jbossmq.SpyMessage; /** * This is used to keep a log of SpyMessages arriving and leaving * a queue. The log can be used reconstruct the queue in case of * provider failure. Integrety is kept by the use of an ObjectIntegrityLog. * * @author: Hiram Chirino ([EMAIL PROTECTED]) * @version $Revision: 1.1 $ */ public class SpyMessageLog { ///////////////////////////////////////////////////////////////////// // Attributes ///////////////////////////////////////////////////////////////////// private ObjectIntegrityLog transactionLog; private MessageAddedRecord messageAddedRecord = new MessageAddedRecord(); private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord(); ///////////////////////////////////////////////////////////////////// // Helper Inner Classes ///////////////////////////////////////////////////////////////////// static class MessageAddedRecord implements Serializable { long messageId; boolean isTransacted; long transactionId; SpyMessage message; private final static long serialVersionUID = 235726945332013954L; } static class MessageRemovedRecord implements Serializable { boolean isTransacted; long transactionId; long messageId; private final static long serialVersionUID = 235726945332013955L; } ///////////////////////////////////////////////////////////////////// // Constructor ///////////////////////////////////////////////////////////////////// public SpyMessageLog(String fileName) throws JMSException { try { transactionLog = new ObjectIntegrityLog(fileName); } catch ( IOException e ) { throwJMSException("Could not open the queue's tranaction log: "+fileName,e); } } ///////////////////////////////////////////////////////////////////// // Public Methods ///////////////////////////////////////////////////////////////////// synchronized public void close() throws JMSException { try{ transactionLog.close(); } catch ( IOException e ) { throwJMSException("Could not close the queue's tranaction log.",e); } } synchronized public void delete()throws JMSException{ try{ transactionLog.delete(); } catch ( IOException e ) { throwJMSException("Could not delete the queue's tranaction log.",e); } } synchronized public void add( SpyMessage message, Long transactionId ) throws JMSException { try{ messageAddedRecord.message = message; messageAddedRecord.messageId = message.messageId; if( transactionId == null ) { messageAddedRecord.isTransacted = false; } else { messageAddedRecord.isTransacted = true; messageAddedRecord.transactionId = transactionId.longValue(); } transactionLog.add(messageAddedRecord); transactionLog.commit(); } catch ( IOException e ) { throwJMSException("Could not write to the tranaction log.",e); } } synchronized public void remove( SpyMessage message, Long transactionId ) throws JMSException { try{ messageRemovedRecord.messageId = message.messageId; if( transactionId == null ) { messageRemovedRecord.isTransacted = false; } else { messageRemovedRecord.isTransacted = true; messageRemovedRecord.transactionId = transactionId.longValue(); } transactionLog.add(messageRemovedRecord); transactionLog.commit(); } catch ( IOException e ) { throwJMSException("Could not write to the queue's tranaction log.",e); } } synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws JMSException { java.util.HashMap messageIndex = new java.util.HashMap(); try { ObjectIntegrityLog.IndexItem objects[] = transactionLog.toIndex(); for( int i=0; i < objects.length; i++ ) { Object o = objects[i].record; if( o instanceof MessageAddedRecord ) { MessageAddedRecord r = (MessageAddedRecord)o; r.message.messageId = r.messageId; if( r.isTransacted && !commited.contains(new Long(r.transactionId)) ) { // the TX this message was part of was not // commited... so drop this message continue; } messageIndex.put( new Long(r.messageId), objects[i]); } else if( o instanceof MessageRemovedRecord ) { MessageRemovedRecord r = (MessageRemovedRecord)o; if( r.isTransacted && !commited.contains(new Long(r.transactionId)) ) { // the TX this message was part of was not // commited... so drop this message continue; } messageIndex.remove( new Long(r.messageId)); } } } catch ( Exception e ) { throwJMSException("Could not rebuild the queue from the queue's tranaction log.",e); } SpyMessage rc[] = new SpyMessage[messageIndex.size()]; java.util.Iterator iter = messageIndex.values().iterator(); for( int i=0; iter.hasNext(); i++ ) { ObjectIntegrityLog.IndexItem item = (ObjectIntegrityLog.IndexItem)iter.next(); rc[i] = ((MessageAddedRecord)item.record).message; } return rc; } private void throwJMSException(String message, Exception e) throws JMSException { JMSException newE = new JMSException(message); newE.setLinkedException(e); throw newE; } } 1.1 jbossmq/src/main/org/jbossmq/pm/rollinglogged/SpyTxLog.java Index: SpyTxLog.java =================================================================== /* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jbossmq.pm.rollinglogged; import java.io.Serializable; import java.io.IOException; import javax.jms.JMSException; /** * This is used to keep a log of commited transactions. * * @author: Hiram Chirino ([EMAIL PROTECTED]) * @version $Revision: 1.1 $ */ public class SpyTxLog { ///////////////////////////////////////////////////////////////////// // Attributes ///////////////////////////////////////////////////////////////////// private ObjectIntegrityLog transactionLog; private int liveTransactionCount = 0; private Object counterLock = new Object(); ///////////////////////////////////////////////////////////////////// // Constructors ///////////////////////////////////////////////////////////////////// public SpyTxLog(String fileName) throws JMSException { try { transactionLog = new ObjectIntegrityLog(fileName); } catch (IOException e) { throwJMSException("Could not open the queue's tranaction log: " + fileName, e); } } ///////////////////////////////////////////////////////////////////// // Public Methods ///////////////////////////////////////////////////////////////////// synchronized public void close() throws JMSException { try{ transactionLog.close(); } catch ( IOException e ) { throwJMSException("Could not close the queue's tranaction log.",e); } } synchronized public void delete() throws JMSException { try{ transactionLog.delete(); } catch ( IOException e ) { throwJMSException("Could not delete the queue's tranaction log.",e); } } public void createTx() throws JMSException { synchronized(counterLock){ ++liveTransactionCount; } } public boolean completed() throws JMSException { synchronized(counterLock){ return (liveTransactionCount == 0); } } synchronized public void commitTx(Long id) throws JMSException { try { transactionLog.add(id); transactionLog.commit(); synchronized(counterLock){ --liveTransactionCount; } } catch ( IOException e ) { throwJMSException("Could not create a new transaction.",e); } } synchronized public void restore(java.util.TreeSet result) throws JMSException { try { result.addAll(transactionLog.toTreeSet()); } catch ( Exception e ) { throwJMSException("Could not restore the transaction log.",e); } } public void rollbackTx(Long txId) throws JMSException { synchronized(counterLock){ --liveTransactionCount; } } ///////////////////////////////////////////////////////////////////// // Private Methods ///////////////////////////////////////////////////////////////////// private void throwJMSException(String message, Exception e) throws JMSException { JMSException newE = new JMSException(message); newE.setLinkedException(e); throw newE; } } _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development