User: pkendall Date: 01/08/08 18:18:28 Modified: src/main/org/jbossmq/pm/logged PersistenceManager.java SpyMessageLog.java SpyMessageLogTester.java SpyTxLog.java Log: Major updates (especially to topics). Speed improvements. Make JVM IL work (by using a singleton JMSServer). Message Listeners re-implemented using client-side thread. Revision Changes Path 1.4 +21 -19 jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java Index: PersistenceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- PersistenceManager.java 2001/07/30 03:39:15 1.3 +++ PersistenceManager.java 2001/08/09 01:18:28 1.4 @@ -19,8 +19,8 @@ import org.jbossmq.server.JMSDestination; import org.jbossmq.SpyMessage; import org.jbossmq.SpyDestination; +import org.jbossmq.SpyJMSException; - import javax.naming.InitialContext; import org.jbossmq.pm.TxManager; import org.jboss.util.ServiceMBeanSupport; @@ -32,13 +32,18 @@ * * @author Hiram Chirino ([EMAIL PROTECTED]) * - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ -public class PersistenceManager - extends ServiceMBeanSupport +public class PersistenceManager + extends ServiceMBeanSupport implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, MBeanRegistration { - + /** + * NewPersistenceManager constructor. + */ + public PersistenceManager() throws javax.jms.JMSException { + txManager = new TxManager( this ); + } private String dataDirectory; // Log file used to store commited transactions. @@ -58,19 +63,16 @@ } - public PersistenceManager() { - txManager = new TxManager(this); - } - public Long createPersistentTx() throws javax.jms.JMSException { + public org.jbossmq.pm.Tx createPersistentTx() throws javax.jms.JMSException { return spyTxLog.createTx(); } - public void commitPersistentTx(Long txId) throws javax.jms.JMSException { + public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws javax.jms.JMSException { spyTxLog.commitTx(txId); } - public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException { + public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws javax.jms.JMSException { spyTxLog.rollbackTx(txId); } @@ -111,15 +113,15 @@ public void initService() throws Exception { URL configFile = getClass().getClassLoader().getResource("jboss.jcml"); - + dataDirURL = new URL(configFile, dataDirectory); URL txLogFile = new URL(dataDirURL, "transactions.dat"); spyTxLog = new SpyTxLog(txLogFile.getFile()); - + //Get an InitialContext - JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new String[] {} ); + JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new String[] {} ); server.setPersistenceManager(this); - + } public void restore(org.jbossmq.server.JMSServer server) throws javax.jms.JMSException { @@ -160,12 +162,12 @@ public void startService() throws Exception { - JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new String[] {} ); + JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new String[] {} ); restore(server); - + } - public void add(org.jbossmq.SpyMessage message, Long txId) throws javax.jms.JMSException { + public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws javax.jms.JMSException { LogInfo logInfo; @@ -189,7 +191,7 @@ SpyMessageLog log = (SpyMessageLog)messageLogs.remove(""+dest); if( log == null ) - throw new JMSException("The persistence log was never initialized"); + throw new SpyJMSException("The persistence log was never initialized"); log.close(); file.delete(); @@ -225,7 +227,7 @@ } - public void remove(org.jbossmq.SpyMessage message, Long txId) throws javax.jms.JMSException { + public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws javax.jms.JMSException { LogInfo logInfo; 1.3 +40 -40 jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java Index: SpyMessageLog.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- SpyMessageLog.java 2001/07/16 02:51:46 1.2 +++ SpyMessageLog.java 2001/08/09 01:18:28 1.3 @@ -6,35 +6,35 @@ */ package org.jbossmq.pm.logged; +import org.jbossmq.SpyJMSException; + import java.io.IOException; import java.io.Serializable; - - import javax.jms.JMSException; import org.jbossmq.SpyMessage; /** - * This is used to keep a log of SpyMessages arriving and leaving + * This is used to keep a log of SpyMessages arriving and leaving * a queue. The log can be used reconstruct the queue in case of * provider failure. Integrety is kept by the use of an ObjectIntegrityLog. * * @author: Hiram Chirino ([EMAIL PROTECTED]) - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class SpyMessageLog { ///////////////////////////////////////////////////////////////////// // Attributes - ///////////////////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////// private ObjectIntegrityLog transactionLog; private MessageAddedRecord messageAddedRecord = new MessageAddedRecord(); private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord(); ///////////////////////////////////////////////////////////////////// // Helper Inner Classes - ///////////////////////////////////////////////////////////////////// + ///////////////////////////////////////////////////////////////////// static class MessageAddedRecord implements Serializable { long messageId; boolean isTransacted; @@ -42,7 +42,7 @@ SpyMessage message; private final static long serialVersionUID = 235726945332013954L; } - + static class MessageRemovedRecord implements Serializable { boolean isTransacted; long transactionId; @@ -50,33 +50,33 @@ private final static long serialVersionUID = 235726945332013955L; } - + ///////////////////////////////////////////////////////////////////// // Constructor ///////////////////////////////////////////////////////////////////// public SpyMessageLog(String fileName) throws JMSException { try { - transactionLog = new ObjectIntegrityLog(fileName); + transactionLog = new ObjectIntegrityLog(fileName); } catch ( IOException e ) { throwJMSException("Could not open the queue's tranaction log: "+fileName,e); } } + - ///////////////////////////////////////////////////////////////////// // Public Methods ///////////////////////////////////////////////////////////////////// synchronized public void close() throws JMSException { try{ - transactionLog.close(); + transactionLog.close(); } catch ( IOException e ) { throwJMSException("Could not close the queue's tranaction log.",e); } } - synchronized public void add( SpyMessage message, Long transactionId ) throws JMSException { + synchronized public void add( SpyMessage message, org.jbossmq.pm.Tx transactionId ) throws JMSException { try{ - + messageAddedRecord.message = message; messageAddedRecord.messageId = message.messageId; if( transactionId == null ) { @@ -85,19 +85,19 @@ messageAddedRecord.isTransacted = true; messageAddedRecord.transactionId = transactionId.longValue(); } - + transactionLog.add(messageAddedRecord); transactionLog.commit(); - + } catch ( IOException e ) { throwJMSException("Could not write to the tranaction log.",e); } - - } - - synchronized public void remove( SpyMessage message, Long transactionId ) throws JMSException { + + } + + synchronized public void remove( SpyMessage message, org.jbossmq.pm.Tx transactionId ) throws JMSException { try{ - + messageRemovedRecord.messageId = message.messageId; if( transactionId == null ) { messageRemovedRecord.isTransacted = false; @@ -107,25 +107,25 @@ } transactionLog.add(messageRemovedRecord); transactionLog.commit(); - + } catch ( IOException e ) { throwJMSException("Could not write to the queue's tranaction log.",e); } - } - + } + synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws JMSException { java.util.HashMap messageIndex = new java.util.HashMap(); - - try { + + try { ObjectIntegrityLog.IndexItem objects[] = transactionLog.toIndex(); - + for( int i=0; i < objects.length; i++ ) { - + Object o = objects[i].record; if( o instanceof MessageAddedRecord ) { - + MessageAddedRecord r = (MessageAddedRecord)o; r.message.messageId = r.messageId; @@ -134,11 +134,11 @@ // commited... so drop this message continue; } - + messageIndex.put( new Long(r.messageId), objects[i]); - + } else if( o instanceof MessageRemovedRecord ) { - + MessageRemovedRecord r = (MessageRemovedRecord)o; if( r.isTransacted && !commited.contains(new Long(r.transactionId)) ) { @@ -146,11 +146,11 @@ // commited... so drop this message continue; } - + messageIndex.remove( new Long(r.messageId)); - + } - + } } catch ( Exception e ) { throwJMSException("Could not rebuild the queue from the queue's tranaction log.",e); @@ -162,13 +162,13 @@ ObjectIntegrityLog.IndexItem item = (ObjectIntegrityLog.IndexItem)iter.next(); rc[i] = ((MessageAddedRecord)item.record).message; } - return rc; - } - + return rc; + } + private void throwJMSException(String message, Exception e) throws JMSException { - JMSException newE = new JMSException(message); + JMSException newE = new SpyJMSException(message); newE.setLinkedException(e); - throw newE; + throw newE; } - + } 1.3 +13 -13 jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java Index: SpyMessageLogTester.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- SpyMessageLogTester.java 2001/07/16 02:51:46 1.2 +++ SpyMessageLogTester.java 2001/08/09 01:18:28 1.3 @@ -12,10 +12,10 @@ /** * This class was used to perform unit testing on the SpyMessageLog/SpyTxLog - * * + * * @author: Hiram Chirino ([EMAIL PROTECTED]) - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class SpyMessageLogTester { @@ -27,10 +27,10 @@ SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat"); SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat"); - - try{ + + try{ - java.util.TreeSet commited = tm.restore(); + java.util.TreeSet commited = tm.restore(); SpyMessage[] queue = log.restore(commited); System.out.println("Recovered :"+queue.length+" message from the message log"); @@ -39,9 +39,9 @@ System.out.println(" #"+i+": "+queue[i]); maxMessageId = Math.max(maxMessageId, queue[i].messageId ); } + + org.jbossmq.pm.Tx tx1 = tm.createTx(); - Long tx1 = tm.createTx(); - long first = ++maxMessageId; add(log, first,tx1); long second = ++maxMessageId; @@ -51,14 +51,14 @@ System.out.println("Commiting"); tm.commitTx(tx1); - Long tx2 = tm.createTx(); + org.jbossmq.pm.Tx tx2 = tm.createTx(); add(log, first,tx2); System.out.println("Rolling back"); tm.rollbackTx(tx2); add(log, second+1, null); - + System.exit(0); } finally { log.close(); @@ -67,17 +67,17 @@ } - public static void add(SpyMessageLog log, long messageId, Long txid) throws Exception { + public static void add(SpyMessageLog log, long messageId, org.jbossmq.pm.Tx txid) throws Exception { SpyTextMessage m = new SpyTextMessage(); m.messageId = messageId; m.setText("Hello World #"+m.messageId); System.out.println("Adding message: "+m+",tx="+txid); log.add(m,txid); + + } - } - - public static void remove(SpyMessageLog log, long messageId, Long txid) throws Exception { + public static void remove(SpyMessageLog log, long messageId, org.jbossmq.pm.Tx txid) throws Exception { SpyTextMessage m = new SpyTextMessage(); m.messageId = messageId; 1.3 +22 -21 jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java Index: SpyTxLog.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- SpyTxLog.java 2001/07/16 02:51:46 1.2 +++ SpyTxLog.java 2001/08/09 01:18:28 1.3 @@ -5,7 +5,8 @@ * See terms of license at gnu.org. */ package org.jbossmq.pm.logged; - +import org.jbossmq.SpyJMSException; + import java.io.Serializable; import java.io.IOException; @@ -15,7 +16,7 @@ * This is used to keep a log of commited transactions. * * @author: Hiram Chirino ([EMAIL PROTECTED]) - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class SpyTxLog { @@ -24,7 +25,7 @@ ///////////////////////////////////////////////////////////////////// private ObjectIntegrityLog transactionLog; private long nextTransactionId = Long.MIN_VALUE; - + ///////////////////////////////////////////////////////////////////// // Constructors ///////////////////////////////////////////////////////////////////// @@ -41,35 +42,35 @@ ///////////////////////////////////////////////////////////////////// synchronized public void close() throws JMSException { try{ - transactionLog.close(); + transactionLog.close(); } catch ( IOException e ) { throwJMSException("Could not close the queue's tranaction log.",e); } } - - synchronized public void commitTx(Long id) throws JMSException { - + + synchronized public void commitTx(org.jbossmq.pm.Tx id) throws JMSException { + try { transactionLog.add(id); transactionLog.commit(); } catch ( IOException e ) { throwJMSException("Could not create a new transaction.",e); } - + } - - synchronized public Long createTx() throws JMSException { - return new Long(nextTransactionId++); + + synchronized public org.jbossmq.pm.Tx createTx() throws JMSException { + return new org.jbossmq.pm.Tx(nextTransactionId++); } - + synchronized public java.util.TreeSet restore() throws JMSException { - + java.util.TreeSet items=null; try { items = transactionLog.toTreeSet(); } catch ( Exception e ) { throwJMSException("Could not restore the transaction log.",e); - } + } long maxId = Long.MIN_VALUE; java.util.Iterator iter = items.iterator(); @@ -81,20 +82,20 @@ nextTransactionId = maxId+1; return items; - + } - synchronized public void rollbackTx(Long txId) throws JMSException { - + synchronized public void rollbackTx(org.jbossmq.pm.Tx txId) throws JMSException { + } - + ///////////////////////////////////////////////////////////////////// // Private Methods ///////////////////////////////////////////////////////////////////// private void throwJMSException(String message, Exception e) throws JMSException { - JMSException newE = new JMSException(message); + JMSException newE = new SpyJMSException(message); newE.setLinkedException(e); - throw newE; + throw newE; } - + } _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development