User: hiram
Date: 00/12/18 22:43:37
Modified: src/java/org/spydermq/server StartServer.java
PersistenceManager.java JMSServerQueueReceiver.java
JMSServer.java InvocationLayerFactory.java
Log:
Add XA support! Well.. I haven't tested very much but it's a start.
Revision Changes Path
1.5 +6 -2 spyderMQ/src/java/org/spydermq/server/StartServer.java
Index: StartServer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- StartServer.java 2000/12/16 03:27:50 1.4
+++ StartServer.java 2000/12/19 06:43:35 1.5
@@ -48,7 +48,7 @@
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class StartServer implements Runnable
{
@@ -221,6 +221,8 @@
String name = element.getField("Name");
String topicConnectionFactoryJNDI =
element.getField("TopicConnectionFactoryJNDI");
String queueConnectionFactoryJNDI =
element.getField("QueueConnectionFactoryJNDI");
+ String xaTopicConnectionFactoryJNDI =
element.getField("XATopicConnectionFactoryJNDI");
+ String xaQueueConnectionFactoryJNDI =
element.getField("XAQueueConnectionFactoryJNDI");
//Set up the transports for the server
InvocationLayerFactory invocationLayerFactory= new
InvocationLayerFactory();
@@ -238,7 +240,9 @@
//(re)bind the connection factories in the JNDI
namespace
ctx.rebind(topicConnectionFactoryJNDI,invocationLayerFactory.spyTopicConnectionFactory);
-
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
+
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
+
ctx.rebind(xaTopicConnectionFactoryJNDI,invocationLayerFactory.spyXATopicConnectionFactory);
+
ctx.rebind(xaQueueConnectionFactoryJNDI,invocationLayerFactory.spyXAQueueConnectionFactory);
}
1.2 +99 -11 spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PersistenceManager.java 2000/12/16 03:27:50 1.1
+++ PersistenceManager.java 2000/12/19 06:43:36 1.2
@@ -20,12 +20,14 @@
import org.spydermq.SpyDestination;
import org.spydermq.SpyMessage;
+import org.spydermq.SpyDistributedConnection;
+
/**
* This class manages all persistence related services.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class PersistenceManager {
@@ -103,29 +105,31 @@
public void commitTx(Long txId) throws javax.jms.JMSException {
LinkedList tasks;
- synchronized (postCommitTasks) {
- tasks = (LinkedList) postCommitTasks.remove(txId);
+ synchronized( postCommitTasks ) {
+ tasks = (LinkedList)postCommitTasks.remove(txId);
+ postRollbackTasks.remove(txId);
}
- if (tasks == null)
+ if( tasks == null )
throw new javax.jms.JMSException("Transaction is not active.");
spyTxLog.commitTx(txId);
-
+
synchronized (tasks) {
Iterator iter = tasks.iterator();
- while (iter.hasNext()) {
- Runnable task = (Runnable) iter.next();
+ while( iter.hasNext() ) {
+ Runnable task = (Runnable)iter.next();
task.run();
}
}
-
+
}
public Long createTx() throws javax.jms.JMSException {
Long txId = spyTxLog.createTx();
synchronized (postCommitTasks) {
- postCommitTasks.put(txId, new LinkedList());
+ postCommitTasks.put(txId, new LinkedList());
+ postRollbackTasks.put(txId, new LinkedList());
}
return txId;
}
@@ -198,13 +202,97 @@
public void rollbackTx(Long txId) throws javax.jms.JMSException {
LinkedList tasks;
- synchronized (postCommitTasks) {
- tasks = (LinkedList) postCommitTasks.remove(txId);
+ synchronized( postCommitTasks ) {
+ tasks = (LinkedList)postRollbackTasks.remove(txId);
+ postCommitTasks.remove(txId);
}
- if (tasks == null)
+ if( tasks == null )
throw new javax.jms.JMSException("Transaction is not active.");
spyTxLog.rollbackTx(txId);
+ synchronized (tasks) {
+ Iterator iter = tasks.iterator();
+ while( iter.hasNext() ) {
+ Runnable task = (Runnable)iter.next();
+ task.run();
+ }
+ }
+
+ }
+
+ // Maps Global transactions to local transactions
+ HashMap globalToLocal = new HashMap();
+ // Maps (Long)txIds to LinkedList of Runnable tasks
+ HashMap postRollbackTasks = new HashMap();
+
+ class GlobalXID implements Runnable {
+ SpyDistributedConnection dc;
+ Object xid;
+
+ GlobalXID(SpyDistributedConnection dc,Object xid) {
+ this.dc = dc;
+ this.xid = xid;
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (obj==null) return false;
+ if (obj.getClass()!=GlobalXID.class) return false;
+ return ((GlobalXID)obj).xid.equals( xid ) &&
+ ((GlobalXID)obj).dc.equals(dc);
+ }
+
+ public int hashCode() {
+ return xid.hashCode();
+ }
+
+ public void run() {
+ synchronized (globalToLocal) {
+ globalToLocal.remove(this);
+ }
+ }
+ }
+
+ public void addPostRollbackTask(Long txId, Runnable task) throws
javax.jms.JMSException {
+
+ LinkedList tasks;
+ synchronized( postRollbackTasks ) {
+ tasks = (LinkedList)postRollbackTasks.get(txId);
+ }
+ if( tasks == null )
+ throw new javax.jms.JMSException("Transaction is not active.");
+
+ synchronized (tasks) {
+ tasks.addLast(task);
+ }
+
+ }
+
+ public Long createTx(SpyDistributedConnection dc, Object xid) throws
javax.jms.JMSException {
+
+ GlobalXID gxid = new GlobalXID(dc, xid);
+ if( globalToLocal.containsValue(gxid) )
+ throw new JMSException("Duplicate transaction from:
"+dc.getClientID()+" xid="+xid);
+
+ Long txId = createTx();
+ globalToLocal.put(gxid, txId);
+
+ //Tasks to remove the global to local mappings on commit/rollback
+ addPostCommitTask(txId, gxid);
+ addPostRollbackTask(txId, gxid);
+
+ return txId;
+ }
+
+ public Long getPrepared(SpyDistributedConnection dc, Object xid) throws
javax.jms.JMSException {
+
+ GlobalXID gxid = new GlobalXID(dc, xid);
+ Long txid = (Long)globalToLocal.get(gxid);
+
+ if( txid == null )
+ throw new JMSException("Transaction does not exist from:
"+dc.getClientID()+" xid="+xid);
+
+ return txid;
}
}
1.3 +9 -2
spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java
Index: JMSServerQueueReceiver.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JMSServerQueueReceiver.java 2000/12/16 03:27:50 1.2
+++ JMSServerQueueReceiver.java 2000/12/19 06:43:36 1.3
@@ -22,7 +22,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class JMSServerQueueReceiver implements Serializable {
@@ -186,7 +186,14 @@
if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
) {
jmsSeverQueue.server.persistenceManager.remove(jmsSeverQueue.destination, m, txId);
- }
+ }
+
+ // We have to restore the message on a rollback if
transacted
+ if( txId != null ) {
+ Runnable task = new RestoreMessageTask(m);
+
jmsSeverQueue.server.persistenceManager.addPostRollbackTask(txId, task);
+ }
+
Log.log("Message Ack: " + m.messageId);
}
}
1.3 +67 -31 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JMSServer.java 2000/12/16 03:27:50 1.2
+++ JMSServer.java 2000/12/19 06:43:36 1.3
@@ -26,7 +26,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
@@ -372,37 +372,8 @@
queue.addSubscriber(dc);
}
- /**
- * The following function performs a Unit Of Work.
- *
- */
- public void transact(SpyDistributedConnection dc, Transaction t) throws
JMSException {
-
- Long txId = persistenceManager.createTx();
-
- try {
-
- if( t.messages != null ) {
- for( int i=0; i < t.messages.length; i++ ) {
- addMessage(dc, t.messages[i], txId);
- }
- }
-
- if( t.acks != null ) {
- for( int i=0; i < t.acks.length; i++ ) {
- acknowledge(dc, t.acks[i], txId);
- }
- }
-
- persistenceManager.commitTx(txId);
-
- } catch ( JMSException e ) {
- persistenceManager.rollbackTx(txId);
- throw e;
- }
-
- }
+
public void unsubscribe(SpyDistributedConnection dc,Destination dest) throws
JMSException
{
Log.log("Server:
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
@@ -439,4 +410,69 @@
return queue;
}
+ /**
+ * The following function performs a Unit Of Work.
+ *
+ */
+ public void transact(SpyDistributedConnection dc, TransactionRequest t) throws
JMSException {
+
+
+ if ( t.requestType == t.ONE_PHASE_COMMIT_REQUEST ) {
+
+ Long txId = persistenceManager.createTx();
+
+ try {
+
+ if( t.messages != null ) {
+ for( int i=0; i < t.messages.length; i++ ) {
+ addMessage(dc, t.messages[i], txId);
+ }
+ }
+
+ if( t.acks != null ) {
+ for( int i=0; i < t.acks.length; i++ ) {
+ acknowledge(dc, t.acks[i], txId);
+ }
+ }
+
+ persistenceManager.commitTx(txId);
+
+ } catch ( JMSException e ) {
+ persistenceManager.rollbackTx(txId);
+ throw e;
+ }
+ } else if ( t.requestType == t.TWO_PHASE_COMMIT_PREPARE_REQUEST) {
+
+ Long txId = persistenceManager.createTx(dc, t.xid);
+ try {
+
+ if( t.messages != null ) {
+ for( int i=0; i < t.messages.length; i++ ) {
+ addMessage(dc, t.messages[i], txId);
+ }
+ }
+
+ if( t.acks != null ) {
+ for( int i=0; i < t.acks.length; i++ ) {
+ acknowledge(dc, t.acks[i], txId);
+ }
+ }
+
+ } catch ( JMSException e ) {
+ persistenceManager.rollbackTx(txId);
+ throw e;
+ }
+ } else if ( t.requestType == t.TWO_PHASE_COMMIT_ROLLBACK_REQUEST ) {
+
+ Long txId = persistenceManager.getPrepared(dc, t.xid);
+ persistenceManager.rollbackTx(txId);
+
+ } else if ( t.requestType == t.TWO_PHASE_COMMIT_COMMIT_REQUEST ) {
+
+ Long txId = persistenceManager.getPrepared(dc, t.xid);
+ persistenceManager.commitTx(txId);
+
+ }
+
+ }
}
1.2 +7 -0
spyderMQ/src/java/org/spydermq/server/InvocationLayerFactory.java
Index: InvocationLayerFactory.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/InvocationLayerFactory.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- InvocationLayerFactory.java 2000/12/12 05:58:44 1.1
+++ InvocationLayerFactory.java 2000/12/19 06:43:36 1.2
@@ -22,6 +22,9 @@
import java.rmi.server.UnicastRemoteObject;
import java.rmi.Remote;
+import org.spydermq.SpyXAQueueConnectionFactory;
+import org.spydermq.SpyXATopicConnectionFactory;
+
public class InvocationLayerFactory
{
@@ -57,6 +60,10 @@
//Create the Topic and Queue Connection Factory objects
spyTopicConnectionFactory = new
SpyTopicConnectionFactory(distributedConnectionFactory);
spyQueueConnectionFactory = new
SpyQueueConnectionFactory(distributedConnectionFactory);
+ spyXATopicConnectionFactory = new
SpyXATopicConnectionFactory(distributedConnectionFactory);
+ spyXAQueueConnectionFactory = new
SpyXAQueueConnectionFactory(distributedConnectionFactory);
}
+ SpyXAQueueConnectionFactory spyXAQueueConnectionFactory;
+ SpyXATopicConnectionFactory spyXATopicConnectionFactory;
}