User: pkendall
Date: 01/08/08 18:18:28
Modified: src/main/org/jbossmq/pm PersistenceManager.java
TxManager.java
Added: src/main/org/jbossmq/pm Tx.java
Log:
Major updates (especially to topics).
Speed improvements.
Make JVM IL work (by using a singleton JMSServer).
Message Listeners re-implemented using client-side thread.
Revision Changes Path
1.4 +6 -6 jbossmq/src/main/org/jbossmq/pm/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/PersistenceManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- PersistenceManager.java 2001/07/28 00:33:38 1.3
+++ PersistenceManager.java 2001/08/09 01:18:28 1.4
@@ -25,7 +25,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Paul Kendall ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public interface PersistenceManager {
@@ -53,17 +53,17 @@
/**
* Create and return a unique transaction id.
*/
- public abstract Long createPersistentTx() throws javax.jms.JMSException;
+ public abstract Tx createPersistentTx() throws javax.jms.JMSException;
/**
* Commit the transaction to the persistent store.
*/
- public abstract void commitPersistentTx(Long txId) throws
javax.jms.JMSException;
+ public abstract void commitPersistentTx(Tx txId) throws javax.jms.JMSException;
/**
* Rollback the transaction.
*/
- public abstract void rollbackPersistentTx(Long txId) throws
javax.jms.JMSException;
+ public abstract void rollbackPersistentTx(Tx txId) throws
javax.jms.JMSException;
@@ -84,7 +84,7 @@
* Remove message from the persistent store.
* If the message is part of a transaction, txId is not null.
*/
- public abstract void add(SpyMessage message, Long txId) throws
javax.jms.JMSException;
+ public abstract void add(SpyMessage message, Tx txId) throws
javax.jms.JMSException;
/**
* Remove the queue, and all messages in it, from the persistent store
@@ -100,5 +100,5 @@
* Remove message from the persistent store.
* If the message is part of a transaction, txId is not null.
*/
- public abstract void remove(SpyMessage message, Long txId) throws
javax.jms.JMSException;
+ public abstract void remove(SpyMessage message, Tx txId) throws
javax.jms.JMSException;
}
1.4 +66 -60 jbossmq/src/main/org/jbossmq/pm/TxManager.java
Index: TxManager.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/TxManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- TxManager.java 2001/07/28 00:33:38 1.3
+++ TxManager.java 2001/08/09 01:18:28 1.4
@@ -15,8 +15,8 @@
import org.jbossmq.xml.XElement;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyDestination;
+import org.jbossmq.SpyJMSException;
-
import org.jbossmq.ConnectionToken;
/**
@@ -25,7 +25,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Paul Kendall ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class TxManager {
@@ -36,6 +36,30 @@
// Maps Global transactions to local transactions
HashMap globalToLocal = new HashMap();
+ protected static int MAX_POOL_SIZE = 500;
+ //pool of linked lists to use for storing txs tasks
+ java.util.ArrayList listPool = new java.util.ArrayList();
+
+ protected LinkedList getList(){
+ synchronized(listPool){
+ if(listPool.isEmpty()){
+ return new LinkedList();
+ }else{
+ return (LinkedList)listPool.remove(listPool.size()-1);
+ }
+ }
+ }
+
+ protected void releaseList(LinkedList list){
+ synchronized(listPool){
+ if(listPool.size() < MAX_POOL_SIZE){
+ list.clear();
+ listPool.add(list);
+ }
+ }
+ }
+
+
class GlobalXID implements Runnable {
ConnectionToken dc;
Object xid;
@@ -67,42 +91,41 @@
/**
* Create and return a unique transaction id.
*/
- public final Long createTx() throws javax.jms.JMSException {
- Long txId = persistenceManager.createPersistentTx();
+ public final Tx createTx() throws javax.jms.JMSException {
+ Tx txId = persistenceManager.createPersistentTx();
synchronized (postCommitTasks) {
- postCommitTasks.put(txId, new LinkedList());
- postRollbackTasks.put(txId, new LinkedList());
+ postCommitTasks.put(txId, getList());
+ postRollbackTasks.put(txId, getList());
}
return txId;
}
-
-
/**
* Commit the transaction to the persistent store.
*/
- public final void commitTx(Long txId) throws javax.jms.JMSException {
+ public final void commitTx(Tx txId) throws javax.jms.JMSException {
LinkedList tasks;
synchronized( postCommitTasks ) {
tasks = (LinkedList)postCommitTasks.remove(txId);
- postRollbackTasks.remove(txId);
+ releaseList((LinkedList)postRollbackTasks.remove(txId));
}
if( tasks == null )
throw new javax.jms.JMSException("Transaction is not active
for commit.");
- persistenceManager.commitPersistentTx(txId);
+ persistenceManager.commitPersistentTx(txId);
- synchronized(tasks){
- Iterator iter = tasks.iterator();
- while( iter.hasNext() ) {
- Runnable task = (Runnable)iter.next();
- task.run();
- }
- }
- }
+ synchronized(tasks){
+ Iterator iter = tasks.iterator();
+ while( iter.hasNext() ) {
+ Runnable task = (Runnable)iter.next();
+ task.run();
+ }
+ }
+ releaseList(tasks);
+ }
- public final void addPostCommitTask(Long txId, Runnable task) throws
javax.jms.JMSException {
+ public final void addPostCommitTask(Tx txId, Runnable task) throws
javax.jms.JMSException {
if( txId == null ) {
task.run();
@@ -124,29 +147,30 @@
/**
* Rollback the transaction.
*/
- public final void rollbackTx(Long txId) throws javax.jms.JMSException {
+ public final void rollbackTx(Tx txId) throws javax.jms.JMSException {
LinkedList tasks;
synchronized( postCommitTasks ) {
tasks = (LinkedList)postRollbackTasks.remove(txId);
- postCommitTasks.remove(txId);
- }
+ releaseList((LinkedList)postCommitTasks.remove(txId));
+ }
if( tasks == null )
throw new javax.jms.JMSException("Transaction is not active
3.");
- persistenceManager.rollbackPersistentTx(txId);
+ persistenceManager.rollbackPersistentTx(txId);
- synchronized(tasks){
- Iterator iter = tasks.iterator();
- while( iter.hasNext() ) {
- Runnable task = (Runnable)iter.next();
- task.run();
- }
- }
+ synchronized(tasks){
+ Iterator iter = tasks.iterator();
+ while( iter.hasNext() ) {
+ Runnable task = (Runnable)iter.next();
+ task.run();
+ }
+ }
- }
+ releaseList(tasks);
+ }
- public final void addPostRollbackTask(Long txId, Runnable task) throws
javax.jms.JMSException {
+ public final void addPostRollbackTask(Tx txId, Runnable task) throws
javax.jms.JMSException {
if( txId == null ) {
return;
@@ -164,24 +188,6 @@
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
PersistenceManager persistenceManager;
public TxManager( PersistenceManager pm ) {
@@ -195,15 +201,15 @@
* allocate a unique local transaction id if the remote id is not already
* known.
*/
- public final Long createTx(ConnectionToken dc, Object xid) throws
javax.jms.JMSException {
+ public final Tx createTx(ConnectionToken dc, Object xid) throws
javax.jms.JMSException {
GlobalXID gxid = new GlobalXID(dc, xid);
synchronized(globalToLocal){
if( globalToLocal.containsKey(gxid) )
- throw new JMSException("Duplicate transaction from:
"+dc.getClientID()+" xid="+xid);
+ throw new SpyJMSException("Duplicate transaction from:
"+dc.getClientID()+" xid="+xid);
}
- Long txId = createTx();
+ Tx txId = createTx();
synchronized(globalToLocal){
globalToLocal.put(gxid, txId);
}
@@ -213,21 +219,21 @@
addPostRollbackTask(txId, gxid);
return txId;
- }
+ }
/**
* Return the local transaction id for a distributed transaction id.
*/
- public final Long getPrepared(ConnectionToken dc, Object xid) throws
javax.jms.JMSException {
+ public final Tx getPrepared(ConnectionToken dc, Object xid) throws
javax.jms.JMSException {
GlobalXID gxid = new GlobalXID(dc, xid);
- Long txid;
+ Tx txid;
synchronized(globalToLocal){
- txid = (Long)globalToLocal.get(gxid);
+ txid = (Tx)globalToLocal.get(gxid);
}
if( txid == null )
- throw new JMSException("Transaction does not exist from:
"+dc.getClientID()+" xid="+xid);
+ throw new SpyJMSException("Transaction does not exist from:
"+dc.getClientID()+" xid="+xid);
return txid;
- }
+ }
}
1.1 jbossmq/src/main/org/jbossmq/pm/Tx.java
Index: Tx.java
===================================================================
package org.jbossmq.pm;
public class Tx implements Comparable, java.io.Serializable, java.io.Externalizable{
long value = 0;
public Tx() {
}
public Tx(long value) {
this.value = value;
}
public void setValue(long tx){
value = tx;
}
public int hashCode() {
return (int)(value ^ (value >> 32));
}
public int compareTo(Tx anotherLong) {
long thisVal = this.value;
long anotherVal = anotherLong.value;
return (thisVal<anotherVal ? -1 : (thisVal==anotherVal ? 0 : 1));
}
public int compareTo(Object o) {
return compareTo((Tx)o);
}
public void readExternal(java.io.ObjectInput in) throws java.io.IOException{
value = in.readLong();
}
public void writeExternal(java.io.ObjectOutput out) throws java.io.IOException{
out.writeLong(value);
}
public long longValue(){
return value;
}
public String toString(){
return ""+value;
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development