User: hiram
Date: 00/12/18 22:43:35
Modified: src/java/org/spydermq SpyTopicSession.java SpySession.java
SpyQueueSession.java SpyQueueReceiver.java
SpyMessageConsumer.java SpyConnection.java
Added: src/java/org/spydermq SpyXAQueueConnection.java
SpyXAQueueConnectionFactory.java SpyXAResource.java
SpyXAResourceManager.java SpyXATopicConnection.java
SpyXATopicConnectionFactory.java
TransactionRequest.java
Removed: src/java/org/spydermq Transaction.java
Log:
Add XA support! Well.. I haven't tested very much but it's a start.
Revision Changes Path
1.10 +17 -4 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyTopicSession.java 2000/12/12 05:58:57 1.9
+++ SpyTopicSession.java 2000/12/19 06:43:33 1.10
@@ -13,6 +13,7 @@
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.TemporaryTopic;
+import javax.jms.XATopicSession;
import java.util.Collection;
import java.util.HashSet;
@@ -25,24 +26,30 @@
import org.spydermq.selectors.Selector;
import org.spydermq.Log;
+
/**
- * This class implements javax.jms.TopicSession
+ * This class implements javax.jms.TopicSession and javax.jms.XATopicSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyTopicSession
extends SpySession
- implements TopicSession
+ implements TopicSession, javax.jms.XATopicSession
{
// Constructor ---------------------------------------------------
+ SpyTopicSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop, boolean xaSession)
+ {
+ super(myConnection,transacted,acknowledgeMode,stop,xaSession);
+ }
+
SpyTopicSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
- super(myConnection,transacted,acknowledgeMode,stop);
+ this(myConnection,transacted,acknowledgeMode,stop,false);
}
// Public --------------------------------------------------------
@@ -110,4 +117,10 @@
//Not yet implemented
}
+ /**
+ * getTopicSession method comment.
+ */
+ public javax.jms.TopicSession getTopicSession() throws javax.jms.JMSException {
+ return this;
+ }
}
1.15 +90 -104 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- SpySession.java 2000/12/13 15:59:10 1.14
+++ SpySession.java 2000/12/19 06:43:33 1.15
@@ -16,6 +16,8 @@
import javax.jms.MessageListener;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
import java.io.Serializable;
import java.io.File;
@@ -25,16 +27,17 @@
import java.util.Iterator;
import java.util.Collection;
+
/**
- * This class implements javax.jms.Session
+ * This class implements javax.jms.Session and javax.jms.XASession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
*/
public class SpySession
- implements Runnable, Session
+ implements Runnable, Session, XASession
{
// Attributes ----------------------------------------------------
@@ -46,8 +49,7 @@
private MessageListener messageListener;
//The connection object to which this session is linked
protected SpyConnection connection;
- //The outgoing message queue
- protected LinkedList outgoingQueue;
+
//Is my connection in stopped mode ?
protected boolean modeStop;
//Is the session closed ?
@@ -57,16 +59,23 @@
//MessageConsumers created by this session
protected HashSet consumers;
+ //The transctionId of the current transaction (registed with the
SpyXAResourceManager)
+ Object currentTransactionId;
+ // If this is an XASession, we have an associated XAResource
+ SpyXAResource spyXAResource;
+
// Constructor ---------------------------------------------------
- SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
+ SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop,
boolean xaSession)
{
connection=conn;
- transacted=trans;
+ transacted=trans;
acknowledgeMode=acknowledge;
- outgoingQueue=new LinkedList();
modeStop=stop;
+ if( xaSession )
+ spyXAResource = new SpyXAResource(this);
+
messageListener=null;
closed=false;
mutex=new Mutex();
@@ -79,9 +88,14 @@
//Wait for the thread to sleep
mutex.waitLocked();
+
+ //Have a TX ready with the resource manager.
+ if( spyXAResource==null && transacted )
+ currentTransactionId =
connection.spyXAResourceManager.startTx();
}
+
// Public --------------------------------------------------------
public BytesMessage createBytesMessage() throws JMSException
@@ -246,9 +260,11 @@
//Commit a transacted session
public synchronized void commit() throws JMSException
{
+ if (spyXAResource!=null) throw new
javax.jms.TransactionInProgressException("Should not be call from a XASession");
if (closed) throw new IllegalStateException("The session is closed");
if (!transacted) throw new IllegalStateException("The session is not
transacted");
+
Log.log("Session: commit()");
boolean modeSav=modeStop;
@@ -257,23 +273,21 @@
//Wait for the thread to sleep
synchronized (mutex) {
mutex.waitToSleep();
- Transaction transaction = new Transaction();
-
- // Send to server all published messages
- if (outgoingQueue.size()!=0) {
- SpyMessage job[]=new SpyMessage[outgoingQueue.size()];
- job=(SpyMessage[])outgoingQueue.toArray(job);
- transaction.messages = job;
- }
-
- //Clear the outgoing queue
- outgoingQueue.clear();
-
- //Acknowlege all consumed messages
- transaction.acks = removeAcknowledgementItems();
-
- connection.send(transaction);
-
+
+ // commit transaction with onePhase commit
+ try {
+
connection.spyXAResourceManager.commit(currentTransactionId, true);
+ } catch ( javax.transaction.xa.XAException e ) {
+ connection.failureHandler(e,"Could not commit");
+ } finally {
+ try {
+
connection.spyXAResourceManager.endTx(currentTransactionId, true);
+ } catch ( Exception ignore ) {}
+ try {
+ currentTransactionId =
connection.spyXAResourceManager.startTx();
+ } catch ( Exception ignore ) {}
+ }
+
//We have finished our work, we can wake up the thread
modeStop=modeSav;
mutex.notifyLock();
@@ -284,6 +298,7 @@
//Rollback a transacted session
public synchronized void rollback() throws JMSException
{
+ if (spyXAResource!=null) throw new
javax.jms.TransactionInProgressException("Should not be call from a XASession");
if (closed) throw new IllegalStateException("The session is closed");
if (!transacted) throw new IllegalStateException("The session is not
transacted");
@@ -296,19 +311,20 @@
synchronized (mutex) {
mutex.waitToSleep();
-
- Transaction transaction = new Transaction();
- //Clear the outgoing queue
- outgoingQueue.clear();
-
- //Neg Acknowlege all consumed messages
- transaction.acks = removeAcknowledgementItems();
- for( int i=0; i < transaction.acks.length; i++ ) {
- transaction.acks[i].isAck = false;
- }
-
- connection.send(transaction);
+ // rollback transaction
+ try {
+
connection.spyXAResourceManager.rollback(currentTransactionId);
+ } catch ( javax.transaction.xa.XAException e ) {
+ connection.failureHandler(e,"Could not commit");
+ } finally {
+ try {
+
connection.spyXAResourceManager.endTx(currentTransactionId, true);
+ } catch ( Exception ignore ) {}
+ try {
+ currentTransactionId =
connection.spyXAResourceManager.startTx();
+ } catch ( Exception ignore ) {}
+ }
//We have finished our work, we can wake up the thread
modeStop=modeSav;
@@ -323,30 +339,9 @@
Log.log("Session: recover()");
- boolean modeSav=modeStop;
- modeStop=true;
+ rollback();
- //Wait for the thread to sleep
- synchronized (mutex) {
-
- mutex.waitToSleep();
-
- Transaction transaction = new Transaction();
-
- //Neg Acknowlege all consumed messages
- transaction.acks = removeAcknowledgementItems();
- for( int i=0; i < transaction.acks.length; i++ ) {
- transaction.acks[i].isAck = false;
- }
-
- connection.send(transaction);
-
- //We have finished our work, we can wake up the thread
- modeStop=modeSav;
- mutex.notifyLock();
- }
-
}
public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
@@ -379,18 +374,8 @@
if (modeStop==newValue) return;
modeStop=newValue;
-
- if (modeStop) {
-
- //Wait for the thread to sleep
- mutex.waitToSleep();
-
- } else {
-
- //Wake up the thread
- mutex.notifyLock();
-
- }
+
+ notifyStopMode();
}
@@ -423,49 +408,50 @@
}
- //Get all the messages that have been consumed but need acks
- private SpyAcknowledgementItem[] removeAcknowledgementItems()
- {
- //Count the messages
- int i=0;
- Iterator iter=consumers.iterator();
- while (iter.hasNext()) {
- SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
- i += mc.messagesConsumed.size();
- }
-
- //Now fill the array
- SpyAcknowledgementItem items[] = new SpyAcknowledgementItem[i];
- i=0;
- iter=consumers.iterator();
- while (iter.hasNext()) {
- SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
- Iterator mesIter = mc.messagesConsumed.iterator();
- while(mesIter.hasNext()) {
- String messageId = (String)mesIter.next();
- SpyAcknowledgementItem item = new
SpyAcknowledgementItem();
- item.jmsDestination = mc.destination;
- item.jmsMessageID = messageId;
- item.isAck = true;
- items[i++] = item;
- }
- mc.messagesConsumed.clear();
- }
-
- return items;
- }
+
//called by a MessageProducer object which needs to publish a message
void sendMessage(SpyMessage m) throws JMSException {
if (closed)
throw new IllegalStateException("The session is closed");
-
+
if( transacted ) {
- outgoingQueue.add(m);
+
connection.spyXAResourceManager.addMessage(currentTransactionId, m);
} else {
connection.sendToServer(m);
}
}
+
+
+
+ /**
+ * getXAResource method comment.
+ */
+ public javax.transaction.xa.XAResource getXAResource() {
+ return null;
+ }
+
+ // The XA transaction state may have changed... either it started or ended
+ // We have to wait until message delivery has stopped or wake up the thread
+ void notifyStopMode()
+ {
+ // Should it be stopped because we are outside the XA transaction
(start/end)
+ boolean xaStop = spyXAResource!=null && currentTransactionId==null;
+
+ if ( modeStop || xaStop ) {
+
+ //Wait for the thread to sleep
+ mutex.waitToSleep();
+
+ } else {
+
+ //Wake up the thread
+ mutex.notifyLock();
+
+ }
+
+ }
+
}
1.8 +19 -5 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyQueueSession.java 2000/12/12 05:58:58 1.7
+++ SpyQueueSession.java 2000/12/19 06:43:33 1.8
@@ -15,6 +15,7 @@
import javax.jms.TemporaryQueue;
import javax.jms.QueueBrowser;
import javax.jms.DeliveryMode;
+import javax.jms.XAQueueSession;
import java.util.HashSet;
import java.util.HashMap;
@@ -22,25 +23,30 @@
/**
- * This class implements javax.jms.QueueSession
+ * This class implements javax.jms.QueueSession and javax.jms.XAQueueSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyQueueSession
extends SpySession
- implements QueueSession
+ implements QueueSession, XAQueueSession
{
// Constructor ---------------------------------------------------
- SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
+ SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop, boolean xaSession)
{
- super(myConnection,transacted,acknowledgeMode,stop);
+ super(myConnection,transacted,acknowledgeMode,stop, xaSession);
}
+ SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
+ {
+ this(myConnection,transacted,acknowledgeMode,stop, false);
+ }
+
// Public --------------------------------------------------------
public QueueBrowser createBrowser(Queue queue) throws JMSException
@@ -97,4 +103,12 @@
return ((SpyQueueConnection)connection).getTemporaryQueue();
}
+
+ /**
+ * getQueueSession method comment.
+ */
+ public QueueSession getQueueSession() throws javax.jms.JMSException {
+ return this;
+ }
+
}
1.6 +3 -2 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SpyQueueReceiver.java 2000/11/19 19:59:57 1.5
+++ SpyQueueReceiver.java 2000/12/19 06:43:34 1.6
@@ -16,8 +16,9 @@
* This class implements javax.jms.QueueReceiver
*
* @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
@@ -208,7 +209,7 @@
messageListener.onMessage(mes);
if( session.transacted ) {
- messagesConsumed.addLast(mes.getJMSMessageID());
+
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
} else if( session.acknowledgeMode==session.AUTO_ACKNOWLEDGE
|| session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
mes.doAcknowledge();
}
1.6 +6 -8 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SpyMessageConsumer.java 2000/12/13 15:59:09 1.5
+++ SpyMessageConsumer.java 2000/12/19 06:43:34 1.6
@@ -12,8 +12,10 @@
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Destination;
+
import java.util.LinkedList;
import java.util.Iterator;
+
import org.spydermq.selectors.Selector;
/**
@@ -22,7 +24,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class SpyMessageConsumer
implements MessageConsumer
@@ -45,8 +47,7 @@
//Is the consumer sleeping in a receive() ?
boolean waitInReceive;
public Destination destination;
- //If the session is transacted: contains JMSMessageId's of messages consumed
- LinkedList messagesConsumed;
+
//Am I in noLocal mode ?
boolean noLocal;
@@ -61,9 +62,6 @@
messageSelector=null;
messages=new LinkedList();
waitInReceive=false;
- if( session.transacted ) {
- messagesConsumed = new LinkedList();
- }
}
@@ -170,7 +168,7 @@
message.setSpySession(session);
if( session.transacted ) {
-
messagesConsumed.addLast(message.getJMSMessageID());
+
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId,
message);
} else if(
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE ||
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
message.doAcknowledge();
}
@@ -216,7 +214,7 @@
messageListener.onMessage(mes);
if( session.transacted ) {
-
messagesConsumed.addLast(mes.getJMSMessageID());
+
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
} else if(
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE ||
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
mes.doAcknowledge();
}
1.17 +15 -14 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- SpyConnection.java 2000/12/12 05:58:56 1.16
+++ SpyConnection.java 2000/12/19 06:43:34 1.17
@@ -32,7 +32,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.16 $
+ * @version $Revision: 1.17 $
*/
public class SpyConnection
implements Connection, Serializable
@@ -60,6 +60,8 @@
String crClassName;
//the exceptionListener
private ExceptionListener exceptionListener;
+ // Used to control tranactions
+ SpyXAResourceManager spyXAResourceManager;
// Constructor ---------------------------------------------------
SpyConnection(DistributedJMSServer theServer,String cID,String crCN) throws
JMSException
@@ -74,6 +76,7 @@
modeStop=true;
clientID=cID;
crClassName=crCN;
+ spyXAResourceManager = new SpyXAResourceManager(this);
}
// Public --------------------------------------------------------
@@ -537,20 +540,8 @@
failureHandler(e,"Cannot acknowlege a message.");
}
}
-
- // Used to commit/rollback a transaction.
- protected void send(Transaction transaction) throws JMSException {
-
- try {
- provider.transact(distributedConnection, transaction);
- } catch (Exception e) {
- failureHandler(e,"Cannot process a transaction.");
- }
-
- }
-
+
//Send a message to the provider
- //[We should try to locally dispatch the message...]
void sendToServer(SpyMessage mes) throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
@@ -565,4 +556,14 @@
}
}
+ // Used to commit/rollback a transaction.
+ protected void send(TransactionRequest transaction) throws JMSException {
+
+ try {
+ provider.transact(distributedConnection, transaction);
+ } catch (Exception e) {
+ failureHandler(e,"Cannot process a transaction.");
+ }
+
+ }
}
1.1 spyderMQ/src/java/org/spydermq/SpyXAQueueConnection.java
Index: SpyXAQueueConnection.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.XAQueueSession;
import javax.jms.XAQueueConnection;
import java.io.Serializable;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
/**
* This class implements javax.jms.XAQueueConnection
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyXAQueueConnection
extends SpyQueueConnection
implements Serializable, XAQueueConnection
{
// Constructor ---------------------------------------------------
public SpyXAQueueConnection(DistributedJMSServer theServer,String cID,String
crCN) throws JMSException
{
super(theServer,cID,crCN);
}
// Public --------------------------------------------------------
public QueueSession createQueueSession(boolean transacted, int
acknowledgeMode) throws JMSException
{
return (QueueSession)createXAQueueSession();
}
/**
* createXAQueueSession method comment.
*/
public javax.jms.XAQueueSession createXAQueueSession() throws
javax.jms.JMSException {
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
XAQueueSession session=new SpyQueueSession(this,true,0,modeStop,true);
//add the new session to the createdSessions list
synchronized (createdSessions) {
createdSessions.add(session);
}
return session;
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyXAQueueConnectionFactory.java
Index: SpyXAQueueConnectionFactory.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.QueueConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XAQueueConnectionFactory;
import javax.jms.JMSException;
import org.spydermq.Log;
import org.spydermq.security.SecurityManager;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
import java.io.Serializable;
import java.util.Properties;
/**
* This class implements javax.jms.XAQueueConnectionFactory
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyXAQueueConnectionFactory
implements XAQueueConnectionFactory, Serializable
{
// Attributes ----------------------------------------------------
private DistributedConnectionFactory factory;
// Constructor ---------------------------------------------------
public SpyXAQueueConnectionFactory( DistributedConnectionFactory factory)
throws Exception
{
this.factory = factory;
}
// Public --------------------------------------------------------
public QueueConnection createQueueConnection() throws JMSException
{
return createXAQueueConnection();
}
public QueueConnection createQueueConnection(String userName, String password)
throws JMSException
{
return createXAQueueConnection(userName, password);
}
//private
private void failureHandler(Exception e,String reason) throws JMSException
{
Log.error(e);
throw new JMSException(reason);
}
/**
* createXAQueueConnection method comment.
*/
public javax.jms.XAQueueConnection createXAQueueConnection() throws
javax.jms.JMSException {
try {
return factory.createXAQueueConnection();
} catch (JMSException e) {
throw e;
} catch (Exception e) {
failureHandler(e,"createQueueConnection has failed !");
return null;
}
}
/**
* createXAQueueConnection method comment.
*/
public javax.jms.XAQueueConnection createXAQueueConnection(String userName,
String password) throws javax.jms.JMSException {
try {
if (userName==null||password==null) throw new
JMSException("Invalid login or password !");
return factory.createXAQueueConnection(userName,password);
} catch (JMSException e) {
throw e;
} catch (Exception e) {
failureHandler(e,"createQueueConnection has failed !");
return null;
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyXAResource.java
Index: SpyXAResource.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.JMSException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
/**
* This class implements the XAResouece interface for used with an XASession.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyXAResource implements XAResource {
//////////////////////////////////////////////////////////////////
// Attributes
//////////////////////////////////////////////////////////////////
SpySession session;
//////////////////////////////////////////////////////////////////
// Constructors
//////////////////////////////////////////////////////////////////
SpyXAResource(SpySession session) {
this.session = session;
}
//////////////////////////////////////////////////////////////////
// Public Methods
//////////////////////////////////////////////////////////////////
/**
* commit method comment.
*/
public void commit(javax.transaction.xa.Xid xid, boolean onePhase) throws
javax.transaction.xa.XAException {
try {
session.connection.spyXAResourceManager.commit(xid, onePhase);
} catch (JMSException e) {
throw new XAException(XAException.XAER_RMERR);
}
}
/**
* end method comment.
*/
public void end(javax.transaction.xa.Xid xid, int flags) throws
javax.transaction.xa.XAException {
if (session.currentTransactionId == null) {
throw new XAException(XAException.XAER_OUTSIDE);
}
switch (flags) {
case TMSUSPEND :
session.currentTransactionId = null;
session.connection.spyXAResourceManager.suspendTx(xid);
break;
case TMFAIL :
session.currentTransactionId = null;
session.connection.spyXAResourceManager.endTx(xid,
false);
break;
case TMSUCCESS :
session.currentTransactionId = null;
session.connection.spyXAResourceManager.endTx(xid,
true);
break;
}
session.notifyStopMode();
}
/**
* forget method comment.
*/
public void forget(javax.transaction.xa.Xid arg1) throws
javax.transaction.xa.XAException {
}
/**
* getTransactionTimeout method comment.
*/
public int getTransactionTimeout() throws javax.transaction.xa.XAException {
return 0;
}
/**
* isSameRM method comment.
*/
public boolean isSameRM(javax.transaction.xa.XAResource arg1) throws
javax.transaction.xa.XAException {
if (!(arg1 instanceof SpyXAResource))
return false;
return ((SpyXAResource) arg1).session.connection.spyXAResourceManager
== session.connection.spyXAResourceManager;
}
/**
* prepare method comment.
*/
public int prepare(javax.transaction.xa.Xid xid) throws
javax.transaction.xa.XAException {
try {
return session.connection.spyXAResourceManager.prepare(xid);
} catch (JMSException e) {
throw new XAException(XAException.XAER_RMERR);
}
}
/**
* recover method comment.
*/
public Xid[] recover(int arg1) throws javax.transaction.xa.XAException {
return new Xid[0];
}
/**
* rollback method comment.
*/
public void rollback(javax.transaction.xa.Xid xid) throws
javax.transaction.xa.XAException {
try {
session.connection.spyXAResourceManager.rollback(xid);
} catch (JMSException e) {
throw new XAException(XAException.XAER_RMERR);
}
}
/**
* setTransactionTimeout method comment.
*/
public boolean setTransactionTimeout(int arg1) throws
javax.transaction.xa.XAException {
return false;
}
/**
* start method comment.
*/
public void start(javax.transaction.xa.Xid xid, int flags) throws
javax.transaction.xa.XAException {
if (session.currentTransactionId != null) {
throw new XAException(XAException.XAER_OUTSIDE);
}
switch (flags) {
case TMNOFLAGS :
session.currentTransactionId =
session.connection.spyXAResourceManager.startTx(xid);
break;
case TMJOIN :
session.currentTransactionId =
session.connection.spyXAResourceManager.joinTx(xid);
break;
case TMRESUME :
session.currentTransactionId =
session.connection.spyXAResourceManager.resumeTx(xid);
break;
}
session.notifyStopMode();
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyXAResourceManager.java
Index: SpyXAResourceManager.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import java.util.HashMap;
import java.util.LinkedList;
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAException;
import javax.jms.JMSException;
/**
* This class implements the ResourceManager used for the XAResources
* used int spyderMQ.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyXAResourceManager implements java.io.Serializable {
//////////////////////////////////////////////////////////////////
// Attributes
//////////////////////////////////////////////////////////////////
SpyConnection connection;
HashMap transactions = new HashMap();
long nextInternalXid = Long.MIN_VALUE;
//Valid tx states:
private static final byte TX_OPEN = 0;
private static final byte TX_SUSPENDED = 1;
private static final byte TX_PREPARED = 2;
private static final byte TX_COMMITED = 3;
private static final byte TX_ROLLEDBACK = 4;
//////////////////////////////////////////////////////////////////
// Helper Inner classes
//////////////////////////////////////////////////////////////////
class TXState {
byte txState = TX_OPEN;
LinkedList sentMessages = new LinkedList();
LinkedList ackedMessages = new LinkedList();
}
//////////////////////////////////////////////////////////////////
// Constructors
//////////////////////////////////////////////////////////////////
public SpyXAResourceManager(SpyConnection conn) {
super();
connection = conn;
}
//////////////////////////////////////////////////////////////////
// Public Methods
//////////////////////////////////////////////////////////////////
public void ackMessage(Object xid, SpyMessage msg) throws JMSException {
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new JMSException("Invalid transaction id.");
SpyAcknowledgementItem item = new SpyAcknowledgementItem();
item.jmsDestination = msg.getJMSDestination();
item.jmsMessageID = msg.getJMSMessageID();
item.isAck = true;
state.ackedMessages.addLast(item);
}
public void addMessage(Object xid, SpyMessage msg) throws JMSException {
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new JMSException("Invalid transaction id.");
state.sentMessages.addLast(msg);
}
public void commit(Object xid, boolean onePhase) throws XAException,
JMSException {
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
if (onePhase) {
TransactionRequest transaction = new TransactionRequest();
transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
transaction.xid = null;
if (state.sentMessages.size() != 0) {
SpyMessage job[] = new
SpyMessage[state.sentMessages.size()];
job = (SpyMessage[]) state.sentMessages.toArray(job);
transaction.messages = job;
}
if (state.ackedMessages.size() != 0) {
SpyAcknowledgementItem job[] = new
SpyAcknowledgementItem[state.ackedMessages.size()];
job = (SpyAcknowledgementItem[])
state.ackedMessages.toArray(job);
transaction.acks = job;
}
connection.send(transaction);
} else {
if (state.txState != TX_PREPARED)
throw new XAException("The transaction had not been
prepared");
TransactionRequest transaction = new TransactionRequest();
transaction.xid = xid;
transaction.requestType =
transaction.TWO_PHASE_COMMIT_COMMIT_REQUEST;
connection.send(transaction);
}
state.txState = TX_COMMITED;
}
public void endTx(Object xid, boolean success) throws XAException {
Object state = transactions.remove(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
}
public Object joinTx(Xid xid) throws XAException {
if (!transactions.containsKey(xid))
throw new XAException(XAException.XAER_NOTA);
return xid;
}
public int prepare(Object xid) throws XAException, JMSException {
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
TransactionRequest transaction = new TransactionRequest();
transaction.requestType = transaction.TWO_PHASE_COMMIT_PREPARE_REQUEST;
transaction.xid = xid;
if (state.sentMessages.size() != 0) {
SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
job = (SpyMessage[]) state.sentMessages.toArray(job);
transaction.messages = job;
}
if (state.ackedMessages.size() != 0) {
SpyAcknowledgementItem job[] = new
SpyAcknowledgementItem[state.ackedMessages.size()];
job = (SpyAcknowledgementItem[])
state.ackedMessages.toArray(job);
transaction.acks = job;
}
connection.send(transaction);
state.txState = TX_PREPARED;
return javax.transaction.xa.XAResource.XA_OK;
}
public Object resumeTx(Xid xid) throws XAException {
if (!transactions.containsKey(xid))
throw new XAException(XAException.XAER_NOTA);
return xid;
}
public void rollback(Object xid) throws XAException, JMSException {
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
if (state.txState != TX_PREPARED) {
TransactionRequest transaction = new TransactionRequest();
transaction.requestType = transaction.ONE_PHASE_COMMIT_REQUEST;
transaction.xid = null;
if (state.ackedMessages.size() != 0) {
SpyAcknowledgementItem job[] = new
SpyAcknowledgementItem[state.ackedMessages.size()];
job = (SpyAcknowledgementItem[])
state.ackedMessages.toArray(job);
transaction.acks = job;
//Neg Acknowlege all consumed messages
for (int i = 0; i < transaction.acks.length; i++) {
transaction.acks[i].isAck = false;
}
}
connection.send(transaction);
} else {
TransactionRequest transaction = new TransactionRequest();
transaction.xid = xid;
transaction.requestType =
transaction.TWO_PHASE_COMMIT_ROLLBACK_REQUEST;
connection.send(transaction);
}
state.txState = TX_ROLLEDBACK;
}
public Object startTx() {
Long newXid = new Long(nextInternalXid++);
transactions.put(newXid, new TXState());
return newXid;
}
public Object startTx(Xid xid) throws XAException {
if (transactions.containsKey(xid))
throw new XAException(XAException.XAER_DUPID);
transactions.put(xid, new TXState());
return xid;
}
public Object suspendTx(Xid xid) throws XAException {
if (!transactions.containsKey(xid))
throw new XAException(XAException.XAER_NOTA);
return xid;
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyXATopicConnection.java
Index: SpyXATopicConnection.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
import javax.jms.TopicSession;
import javax.jms.JMSException;
import java.io.Serializable;
/**
* This class implements the XATopicConnection
*
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyXATopicConnection extends SpyTopicConnection implements
Serializable, XATopicConnection {
//////////////////////////////////////////////////////////////////
// Consustructors
//////////////////////////////////////////////////////////////////
/**
* SpyXATopicConnection constructor comment.
* @param theServer org.spydermq.distributed.interfaces.DistributedJMSServer
* @param cID java.lang.String
* @param crCN java.lang.String
* @exception javax.jms.JMSException The exception description.
*/
public
SpyXATopicConnection(org.spydermq.distributed.interfaces.DistributedJMSServer
theServer, String cID, String crCN) throws javax.jms.JMSException {
super(theServer, cID, crCN);
}
//////////////////////////////////////////////////////////////////
// Public Methods
//////////////////////////////////////////////////////////////////
public TopicSession createTopicSession(boolean transacted, int
acknowledgeMode) throws JMSException {
return (TopicSession) createXATopicSession();
}
public XATopicSession createXATopicSession() throws javax.jms.JMSException {
if (closed)
throw new IllegalStateException("The connection is closed");
if (distributedConnection == null)
createReceiver();
XATopicSession session = new SpyTopicSession(this, true, 0, modeStop,
true);
//add the new session to the createdSessions list
synchronized (createdSessions) {
createdSessions.add(session);
}
return session;
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyXATopicConnectionFactory.java
Index: SpyXATopicConnectionFactory.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.XATopicConnectionFactory;
import javax.jms.XATopicConnection;
import javax.jms.JMSException;
import org.spydermq.Log;
import org.spydermq.security.SecurityManager;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
import java.io.Serializable;
import java.util.Properties;
/**
* This class implements javax.jms.XATopicConnectionFactory
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyXATopicConnectionFactory
implements XATopicConnectionFactory, Serializable
{
//////////////////////////////////////////////////////////////////
// Attributes
//////////////////////////////////////////////////////////////////
private DistributedConnectionFactory factory;
//////////////////////////////////////////////////////////////////
// Consustructors
//////////////////////////////////////////////////////////////////
public SpyXATopicConnectionFactory(DistributedConnectionFactory factory)
throws Exception
{
this.factory = factory;
}
//////////////////////////////////////////////////////////////////
// Public Methods
//////////////////////////////////////////////////////////////////
public TopicConnection createTopicConnection() throws JMSException
{
return createXATopicConnection();
}
public TopicConnection createTopicConnection(String userName, String password)
throws JMSException {
return createXATopicConnection(userName, password);
}
public XATopicConnection createXATopicConnection() throws JMSException {
try {
return factory.createXATopicConnection();
} catch (JMSException e) {
throw e;
} catch (Exception e) {
failureHandler(e,"createTopicConnection has failed !");
return null;
}
}
public XATopicConnection createXATopicConnection(String userName, String
password) throws JMSException {
try {
if (userName==null||password==null) throw new
JMSException("Invalid login or password !");
return factory.createXATopicConnection(userName,password);
} catch (JMSException e) {
throw e;
} catch (Exception e) {
failureHandler(e,"createTopicConnection has failed !");
return null;
}
}
//////////////////////////////////////////////////////////////////
// Private Methods
//////////////////////////////////////////////////////////////////
private void failureHandler(Exception e,String reason) throws JMSException
{
Log.error(e);
throw new JMSException(reason);
}
}
1.1 spyderMQ/src/java/org/spydermq/TransactionRequest.java
Index: TransactionRequest.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import java.io.Serializable;
/**
* This class contians all the data needed to perform a JMS transaction
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class TransactionRequest
implements Serializable
{
//Valid requests types
public static final byte ONE_PHASE_COMMIT_REQUEST=0;
public static final byte TWO_PHASE_COMMIT_COMMIT_REQUEST=2;
public static final byte TWO_PHASE_COMMIT_PREPARE_REQUEST=1;
public static final byte TWO_PHASE_COMMIT_ROLLBACK_REQUEST=3;
//Request type
public byte requestType=ONE_PHASE_COMMIT_REQUEST;
//For 2 phase commit, this identifies the transaction.
public Object xid;
// messages sent in the transaction
public SpyMessage[] messages;
// messages acknowleged in the transaction
public SpyAcknowledgementItem[] acks;
}