User: pkendall
Date: 01/08/08 18:18:28
Modified: src/main/org/jbossmq/pm/logged PersistenceManager.java
SpyMessageLog.java SpyMessageLogTester.java
SpyTxLog.java
Log:
Major updates (especially to topics).
Speed improvements.
Make JVM IL work (by using a singleton JMSServer).
Message Listeners re-implemented using client-side thread.
Revision Changes Path
1.4 +21 -19 jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- PersistenceManager.java 2001/07/30 03:39:15 1.3
+++ PersistenceManager.java 2001/08/09 01:18:28 1.4
@@ -19,8 +19,8 @@
import org.jbossmq.server.JMSDestination;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyDestination;
+import org.jbossmq.SpyJMSException;
-
import javax.naming.InitialContext;
import org.jbossmq.pm.TxManager;
import org.jboss.util.ServiceMBeanSupport;
@@ -32,13 +32,18 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
-public class PersistenceManager
- extends ServiceMBeanSupport
+public class PersistenceManager
+ extends ServiceMBeanSupport
implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean,
MBeanRegistration {
-
+ /**
+ * NewPersistenceManager constructor.
+ */
+ public PersistenceManager() throws javax.jms.JMSException {
+ txManager = new TxManager( this );
+ }
private String dataDirectory;
// Log file used to store commited transactions.
@@ -58,19 +63,16 @@
}
- public PersistenceManager() {
- txManager = new TxManager(this);
- }
- public Long createPersistentTx() throws javax.jms.JMSException {
+ public org.jbossmq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
return spyTxLog.createTx();
}
- public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
+ public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
spyTxLog.commitTx(txId);
}
- public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
+ public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
spyTxLog.rollbackTx(txId);
}
@@ -111,15 +113,15 @@
public void initService() throws Exception {
URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
-
+
dataDirURL = new URL(configFile, dataDirectory);
URL txLogFile = new URL(dataDirURL, "transactions.dat");
spyTxLog = new SpyTxLog(txLogFile.getFile());
-
+
//Get an InitialContext
- JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
+ JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
server.setPersistenceManager(this);
-
+
}
public void restore(org.jbossmq.server.JMSServer server) throws
javax.jms.JMSException {
@@ -160,12 +162,12 @@
public void startService() throws Exception {
- JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
+ JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
restore(server);
-
+
}
- public void add(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+ public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws
javax.jms.JMSException {
LogInfo logInfo;
@@ -189,7 +191,7 @@
SpyMessageLog log = (SpyMessageLog)messageLogs.remove(""+dest);
if( log == null )
- throw new JMSException("The persistence log was never
initialized");
+ throw new SpyJMSException("The persistence log was
never initialized");
log.close();
file.delete();
@@ -225,7 +227,7 @@
}
- public void remove(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+ public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId)
throws javax.jms.JMSException {
LogInfo logInfo;
1.3 +40 -40 jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java
Index: SpyMessageLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyMessageLog.java 2001/07/16 02:51:46 1.2
+++ SpyMessageLog.java 2001/08/09 01:18:28 1.3
@@ -6,35 +6,35 @@
*/
package org.jbossmq.pm.logged;
+import org.jbossmq.SpyJMSException;
+
import java.io.IOException;
import java.io.Serializable;
-
-
import javax.jms.JMSException;
import org.jbossmq.SpyMessage;
/**
- * This is used to keep a log of SpyMessages arriving and leaving
+ * This is used to keep a log of SpyMessages arriving and leaving
* a queue. The log can be used reconstruct the queue in case of
* provider failure. Integrety is kept by the use of an ObjectIntegrityLog.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyMessageLog {
/////////////////////////////////////////////////////////////////////
// Attributes
- /////////////////////////////////////////////////////////////////////
+ /////////////////////////////////////////////////////////////////////
private ObjectIntegrityLog transactionLog;
private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
/////////////////////////////////////////////////////////////////////
// Helper Inner Classes
- /////////////////////////////////////////////////////////////////////
+ /////////////////////////////////////////////////////////////////////
static class MessageAddedRecord implements Serializable {
long messageId;
boolean isTransacted;
@@ -42,7 +42,7 @@
SpyMessage message;
private final static long serialVersionUID = 235726945332013954L;
}
-
+
static class MessageRemovedRecord implements Serializable {
boolean isTransacted;
long transactionId;
@@ -50,33 +50,33 @@
private final static long serialVersionUID = 235726945332013955L;
}
-
+
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public SpyMessageLog(String fileName) throws JMSException {
try {
- transactionLog = new ObjectIntegrityLog(fileName);
+ transactionLog = new ObjectIntegrityLog(fileName);
} catch ( IOException e ) {
throwJMSException("Could not open the queue's tranaction log:
"+fileName,e);
}
}
+
-
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
synchronized public void close() throws JMSException {
try{
- transactionLog.close();
+ transactionLog.close();
} catch ( IOException e ) {
throwJMSException("Could not close the queue's tranaction
log.",e);
}
}
- synchronized public void add( SpyMessage message, Long transactionId ) throws
JMSException {
+ synchronized public void add( SpyMessage message, org.jbossmq.pm.Tx
transactionId ) throws JMSException {
try{
-
+
messageAddedRecord.message = message;
messageAddedRecord.messageId = message.messageId;
if( transactionId == null ) {
@@ -85,19 +85,19 @@
messageAddedRecord.isTransacted = true;
messageAddedRecord.transactionId =
transactionId.longValue();
}
-
+
transactionLog.add(messageAddedRecord);
transactionLog.commit();
-
+
} catch ( IOException e ) {
throwJMSException("Could not write to the tranaction log.",e);
}
-
- }
-
- synchronized public void remove( SpyMessage message, Long transactionId )
throws JMSException {
+
+ }
+
+ synchronized public void remove( SpyMessage message, org.jbossmq.pm.Tx
transactionId ) throws JMSException {
try{
-
+
messageRemovedRecord.messageId = message.messageId;
if( transactionId == null ) {
messageRemovedRecord.isTransacted = false;
@@ -107,25 +107,25 @@
}
transactionLog.add(messageRemovedRecord);
transactionLog.commit();
-
+
} catch ( IOException e ) {
throwJMSException("Could not write to the queue's tranaction
log.",e);
}
- }
-
+ }
+
synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws
JMSException {
java.util.HashMap messageIndex = new java.util.HashMap();
-
- try {
+
+ try {
ObjectIntegrityLog.IndexItem objects[] =
transactionLog.toIndex();
-
+
for( int i=0; i < objects.length; i++ ) {
-
+
Object o = objects[i].record;
if( o instanceof MessageAddedRecord ) {
-
+
MessageAddedRecord r = (MessageAddedRecord)o;
r.message.messageId = r.messageId;
@@ -134,11 +134,11 @@
// commited... so drop this message
continue;
}
-
+
messageIndex.put( new Long(r.messageId),
objects[i]);
-
+
} else if( o instanceof MessageRemovedRecord ) {
-
+
MessageRemovedRecord r =
(MessageRemovedRecord)o;
if( r.isTransacted && !commited.contains(new
Long(r.transactionId)) ) {
@@ -146,11 +146,11 @@
// commited... so drop this message
continue;
}
-
+
messageIndex.remove( new Long(r.messageId));
-
+
}
-
+
}
} catch ( Exception e ) {
throwJMSException("Could not rebuild the queue from the
queue's tranaction log.",e);
@@ -162,13 +162,13 @@
ObjectIntegrityLog.IndexItem item =
(ObjectIntegrityLog.IndexItem)iter.next();
rc[i] = ((MessageAddedRecord)item.record).message;
}
- return rc;
- }
-
+ return rc;
+ }
+
private void throwJMSException(String message, Exception e) throws
JMSException {
- JMSException newE = new JMSException(message);
+ JMSException newE = new SpyJMSException(message);
newE.setLinkedException(e);
- throw newE;
+ throw newE;
}
-
+
}
1.3 +13 -13 jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java
Index: SpyMessageLogTester.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyMessageLogTester.java 2001/07/16 02:51:46 1.2
+++ SpyMessageLogTester.java 2001/08/09 01:18:28 1.3
@@ -12,10 +12,10 @@
/**
* This class was used to perform unit testing on the SpyMessageLog/SpyTxLog
- *
*
+ *
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyMessageLogTester {
@@ -27,10 +27,10 @@
SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat");
SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat");
-
- try{
+
+ try{
- java.util.TreeSet commited = tm.restore();
+ java.util.TreeSet commited = tm.restore();
SpyMessage[] queue = log.restore(commited);
System.out.println("Recovered :"+queue.length+" message from
the message log");
@@ -39,9 +39,9 @@
System.out.println(" #"+i+": "+queue[i]);
maxMessageId = Math.max(maxMessageId,
queue[i].messageId );
}
+
+ org.jbossmq.pm.Tx tx1 = tm.createTx();
- Long tx1 = tm.createTx();
-
long first = ++maxMessageId;
add(log, first,tx1);
long second = ++maxMessageId;
@@ -51,14 +51,14 @@
System.out.println("Commiting");
tm.commitTx(tx1);
- Long tx2 = tm.createTx();
+ org.jbossmq.pm.Tx tx2 = tm.createTx();
add(log, first,tx2);
System.out.println("Rolling back");
tm.rollbackTx(tx2);
add(log, second+1, null);
-
+
System.exit(0);
} finally {
log.close();
@@ -67,17 +67,17 @@
}
- public static void add(SpyMessageLog log, long messageId, Long txid) throws
Exception {
+ public static void add(SpyMessageLog log, long messageId, org.jbossmq.pm.Tx
txid) throws Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
m.setText("Hello World #"+m.messageId);
System.out.println("Adding message: "+m+",tx="+txid);
log.add(m,txid);
+
+ }
- }
-
- public static void remove(SpyMessageLog log, long messageId, Long txid) throws
Exception {
+ public static void remove(SpyMessageLog log, long messageId, org.jbossmq.pm.Tx
txid) throws Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
1.3 +22 -21 jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java
Index: SpyTxLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyTxLog.java 2001/07/16 02:51:46 1.2
+++ SpyTxLog.java 2001/08/09 01:18:28 1.3
@@ -5,7 +5,8 @@
* See terms of license at gnu.org.
*/
package org.jbossmq.pm.logged;
-
+import org.jbossmq.SpyJMSException;
+
import java.io.Serializable;
import java.io.IOException;
@@ -15,7 +16,7 @@
* This is used to keep a log of commited transactions.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyTxLog {
@@ -24,7 +25,7 @@
/////////////////////////////////////////////////////////////////////
private ObjectIntegrityLog transactionLog;
private long nextTransactionId = Long.MIN_VALUE;
-
+
/////////////////////////////////////////////////////////////////////
// Constructors
/////////////////////////////////////////////////////////////////////
@@ -41,35 +42,35 @@
/////////////////////////////////////////////////////////////////////
synchronized public void close() throws JMSException {
try{
- transactionLog.close();
+ transactionLog.close();
} catch ( IOException e ) {
throwJMSException("Could not close the queue's tranaction
log.",e);
}
}
-
- synchronized public void commitTx(Long id) throws JMSException {
-
+
+ synchronized public void commitTx(org.jbossmq.pm.Tx id) throws JMSException {
+
try {
transactionLog.add(id);
transactionLog.commit();
} catch ( IOException e ) {
throwJMSException("Could not create a new transaction.",e);
}
-
+
}
-
- synchronized public Long createTx() throws JMSException {
- return new Long(nextTransactionId++);
+
+ synchronized public org.jbossmq.pm.Tx createTx() throws JMSException {
+ return new org.jbossmq.pm.Tx(nextTransactionId++);
}
-
+
synchronized public java.util.TreeSet restore() throws JMSException {
-
+
java.util.TreeSet items=null;
try {
items = transactionLog.toTreeSet();
} catch ( Exception e ) {
throwJMSException("Could not restore the transaction log.",e);
- }
+ }
long maxId = Long.MIN_VALUE;
java.util.Iterator iter = items.iterator();
@@ -81,20 +82,20 @@
nextTransactionId = maxId+1;
return items;
-
+
}
- synchronized public void rollbackTx(Long txId) throws JMSException {
-
+ synchronized public void rollbackTx(org.jbossmq.pm.Tx txId) throws
JMSException {
+
}
-
+
/////////////////////////////////////////////////////////////////////
// Private Methods
/////////////////////////////////////////////////////////////////////
private void throwJMSException(String message, Exception e) throws
JMSException {
- JMSException newE = new JMSException(message);
+ JMSException newE = new SpyJMSException(message);
newE.setLinkedException(e);
- throw newE;
+ throw newE;
}
-
+
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development