User: hiram
Date: 00/12/21 14:33:57
Modified: src/java/org/spydermq SpyConnection.java
SpyMessageConsumer.java SpyQueueConnection.java
SpyQueueReceiver.java SpyQueueSession.java
SpySession.java SpyTopicSession.java
SpyTopicSubscriber.java SpyXAResource.java
Added: src/java/org/spydermq SpyConnectionConsumer.java
SpyConsumer.java
Log:
Added ConnectionConsumer so that work on the ASF part of
spyderMQ can start.
Revision Changes Path
1.18 +344 -335 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- SpyConnection.java 2000/12/19 06:43:34 1.17
+++ SpyConnection.java 2000/12/21 22:33:55 1.18
@@ -13,6 +13,7 @@
import javax.jms.JMSException;
import javax.jms.ConnectionMetaData;
import javax.jms.ExceptionListener;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.Collection;
@@ -22,24 +23,24 @@
import java.io.FileInputStream;
import java.io.File;
import java.io.IOException;
+
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
-
/**
* This class implements javax.jms.Connection
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.17 $
+ * @version $Revision: 1.18 $
*/
-public class SpyConnection
- implements Connection, Serializable
-{
-
- // Attributes ----------------------------------------------------
+public class SpyConnection implements Connection, Serializable {
+ //////////////////////////////////////////////////////////////
+ // Attributes
+ //////////////////////////////////////////////////////////////
+
// This is our connection to the JMS server
protected DistributedJMSServer provider;
//This is the clientID
@@ -57,290 +58,274 @@
//Is the connection closed ?
boolean closed;
//Name of the connectionReceiver class
- String crClassName;
+ String crClassName;
//the exceptionListener
private ExceptionListener exceptionListener;
// Used to control tranactions
SpyXAResourceManager spyXAResourceManager;
+
+ //////////////////////////////////////////////////////////////
+ // Constructors
+ //////////////////////////////////////////////////////////////
- // Constructor ---------------------------------------------------
- SpyConnection(DistributedJMSServer theServer,String cID,String crCN) throws
JMSException
- {
+ SpyConnection(DistributedJMSServer theServer, String cID, String crCN) throws
JMSException {
//Set the attributes
provider = theServer;
- destinations=new HashMap();
- createdSessions=new HashSet();
- distributedConnection=null;
- closed=false;
- lastMessageID=0;
- modeStop=true;
- clientID=cID;
- crClassName=crCN;
+ destinations = new HashMap();
+ createdSessions = new HashSet();
+ distributedConnection = null;
+ closed = false;
+ lastMessageID = 0;
+ modeStop = true;
+ clientID = cID;
+ crClassName = crCN;
spyXAResourceManager = new SpyXAResourceManager(this);
}
-
- // Public --------------------------------------------------------
-
- //<DEBUG>
-
- public int rec=0;
-
- //</DEBUG>
- public String getClientID() throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
+ //////////////////////////////////////////////////////////////
+ // Public Methods
+ //////////////////////////////////////////////////////////////
+
+ public String getClientID() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
return clientID;
}
+ public void setClientID(String cID) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (clientID != null)
+ throw new IllegalStateException("The connection has already a
clientID");
- public void setClientID(String cID) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is closed");
- if (clientID!=null) throw new IllegalStateException("The connection
has already a clientID");
+ Log.log("SetClientID(" + clientID + ")");
- Log.log("SetClientID("+clientID+")");
-
- try {
+ try {
provider.checkID(cID);
} catch (JMSException e) {
throw e;
} catch (Exception e) {
- failureHandler(e,"Cannot connect to the JMSServer");
+ failureHandler(e, "Cannot connect to the JMSServer");
}
-
- clientID=cID;
+
+ clientID = cID;
}
-
- public ConnectionMetaData getMetaData() throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
-
+
+ public ConnectionMetaData getMetaData() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
return new SpyConnectionMetaData();
}
- public ExceptionListener getExceptionListener() throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- return exceptionListener;
- }
+ public ExceptionListener getExceptionListener() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
- public void setExceptionListener(ExceptionListener listener) throws
JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- exceptionListener=listener;
+ return exceptionListener;
}
- public void start() throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- if (!modeStop) return;
- modeStop=false;
+ public void setExceptionListener(ExceptionListener listener) throws
JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
+ exceptionListener = listener;
+ }
+
+ public void start() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
+ if (!modeStop)
+ return;
+ modeStop = false;
- Iterator i=destinations.keySet().iterator();
+ Iterator i = destinations.keySet().iterator();
while (i.hasNext()) {
- Destination d=(Destination)i.next();
- ConsumerSet ci=(ConsumerSet)destinations.get(d);
+ Destination d = (Destination) i.next();
+ ConsumerSet ci = (ConsumerSet) destinations.get(d);
- if ( ci.getLasListeningState() ) {
+ if (ci.getLasListeningState()) {
try {
-
provider.connectionListening(distributedConnection, true,d);
- } catch ( Exception e ) {
+
provider.connectionListening(distributedConnection, true, d);
+ } catch (Exception e) {
failureHandler(e, "Cannot contact the JMS
server");
}
}
-
+
}
- changeModeStop(modeStop);
+ changeModeStop(modeStop);
}
- public void stop() throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- if (modeStop) return;
- modeStop=true;
+ public void stop() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
+ if (modeStop)
+ return;
+ modeStop = true;
- Iterator i=destinations.keySet().iterator();
+ Iterator i = destinations.keySet().iterator();
while (i.hasNext()) {
- Destination d=(Destination)i.next();
- ConsumerSet ci=(ConsumerSet)destinations.get(d);
+ Destination d = (Destination) i.next();
+ ConsumerSet ci = (ConsumerSet) destinations.get(d);
- if ( ci.getLasListeningState() ) {
+ if (ci.getLasListeningState()) {
try {
-
provider.connectionListening(distributedConnection, false,d);
- } catch ( Exception e ) {
+
provider.connectionListening(distributedConnection, false, d);
+ } catch (Exception e) {
failureHandler(e, "Cannot contact the JMS
server");
}
}
-
+
}
changeModeStop(modeStop);
}
- public synchronized void close() throws JMSException
- {
- if (closed) return;
+ public synchronized void close() throws JMSException {
+ if (closed)
+ return;
//Get an ID / ConnectionReciever
- if (distributedConnection==null) createReceiver();
-
+ if (distributedConnection == null)
+ createReceiver();
+
+ Log.log("Closing sessions");
//notify his sessions
synchronized (createdSessions) {
-
- Object[] vect=createdSessions.toArray();
- for(int i=0;i<vect.length;i++) {
- ((SpySession)vect[i]).close();
+
+ Object[] vect = createdSessions.toArray();
+ for (int i = 0; i < vect.length; i++) {
+ ((SpySession) vect[i]).close();
}
-
+
}
+ Log.log("Closed sessions");
+ Log.log("Disconnecting from server");
//Notify the JMSServer that I am closing
try {
provider.connectionClosing(distributedConnection);
distributedConnection.close();
} catch (Exception e) {
- failureHandler(e,"Cannot close properly the connection");
+ failureHandler(e, "Cannot close properly the connection");
}
-
+ Log.log("Disconnected from server");
+
// Only set the closed flag after all the objects that depend
// on this connection have been closed.
- closed=true;
- }
+ closed = true;
+ }
//called by a TemporaryDestination which is going to be deleted()
- public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
+ public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
- Log.log("SpyConnection: deleteDestination(dest="+dest.toString()+")");
-
+ Log.log("SpyConnection: deleteDestination(dest=" + dest.toString() +
")");
+
try {
-
+
//Remove it from the destinations list
synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
+ HashMap newMap = (HashMap) destinations.clone();
newMap.remove(dest);
- destinations=newMap;
+ destinations = newMap;
}
-
+
//Notify its sessions that this TemporaryDestination is going
to be deleted()
//We could do that only on the Sessions "linked" to this
Destination
synchronized (createdSessions) {
-
- Iterator i=createdSessions.iterator();
+
+ Iterator i = createdSessions.iterator();
while (i.hasNext()) {
-
((SpySession)i.next()).deleteTemporaryDestination(dest);
+ ((SpySession)
i.next()).deleteTemporaryDestination(dest);
}
}
-
+
//Ask the broker to delete() this TemporaryDestination
provider.deleteTemporaryDestination(distributedConnection,
dest);
-
+
} catch (Exception e) {
- failureHandler(e,"Cannot delete the TemporaryDestination");
+ failureHandler(e, "Cannot delete the TemporaryDestination");
}
-
+
}
-
+
//Get a new messageID (creation of a new message)
- String getNewMessageID() throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- return clientID+"-"+(lastMessageID++);
+ String getNewMessageID() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
+ return clientID + "-" + (lastMessageID++);
}
-
+
//notify his sessions that he has changed his stopped mode
- synchronized void changeModeStop(boolean newValue)
- {
+ synchronized void changeModeStop(boolean newValue) {
synchronized (createdSessions) {
-
- Iterator i=createdSessions.iterator();
+
+ Iterator i = createdSessions.iterator();
while (i.hasNext()) {
- ((SpySession)i.next()).notifyStopMode(newValue);
+ ((SpySession) i.next()).setStopMode(newValue);
}
}
-
+
}
-
+
//Called by a session when it is closing
- void sessionClosing(SpySession who)
- {
- synchronized (createdSessions)
- {
+ void sessionClosing(SpySession who) {
+ synchronized (createdSessions) {
createdSessions.remove(who);
}
//This session should not be in the "destinations" object anymore.
//We could check this, though
- }
-
-
- // Protected -------------------------------------------------------
-
- //create a new Distributed object which receives the messages for this
connection
- protected void createReceiver() throws JMSException
- {
- try {
- if (clientID==null) askForAnID();
-
- org.spydermq.distributed.interfaces.ConnectionReceiverSetup cr
=
(org.spydermq.distributed.interfaces.ConnectionReceiverSetup)Class.forName(crClassName).newInstance();
- cr.setConnection(this);
- distributedConnection=new
SpyDistributedConnection(clientID,cr);
-
distributedConnection.setConnectionReceiver(cr.createClient());
-
- provider.setSpyDistributedConnection(distributedConnection);
- } catch (Exception e) {
- failureHandler(e,"Cannot create a ConnectionReceiver");
- }
- }
-
- //ask the JMS server for a new ID
- protected void askForAnID() throws JMSException
- {
- try {
- clientID=provider.getID();
- } catch (Exception e) {
- failureHandler(e,"Cannot get an ID");
- }
}
-
-
- public void failureHandler(Exception e,String reason) throws JMSException
- {
+ public void failureHandler(Exception e, String reason) throws JMSException {
e.printStackTrace();
-
- JMSException excep=new JMSException(reason);
+
+ JMSException excep = new JMSException(reason);
excep.setLinkedException(e);
-
- if (exceptionListener!=null) {
+
+ if (exceptionListener != null) {
synchronized (exceptionListener) {
exceptionListener.onException(excep);
}
}
-
+
throw excep;
}
-
+
public DistributedJMSServer getProvider() {
return provider;
}
-
+
+ //ask the JMS server for a new ID
+ protected void askForAnID() throws JMSException {
+ try {
+ clientID = provider.getID();
+ } catch (Exception e) {
+ failureHandler(e, "Cannot get an ID");
+ }
+ }
+
// The ConsumerSet inner class is used by:
//
// addConsumer()
@@ -350,137 +335,206 @@
// pickListeningConsumer()
//
class ConsumerSet extends HashSet {
- boolean lasListeningState=false;
-
+ boolean lasListeningState = false;
+
boolean getLasListeningState() {
return lasListeningState;
}
-
- boolean listenStateChanged() {
+
+ boolean listenStateChanged() {
boolean t = false;
-
+
Iterator iter = iterator();
- while( iter.hasNext() ) {
- SpyMessageConsumer c = (SpyMessageConsumer)iter.next();
- if( c.isListening() ) {
+ while (iter.hasNext()) {
+ SpyConsumer c = (SpyConsumer) iter.next();
+ if (c.isListening()) {
t = true;
break;
}
}
-
- if( t == lasListeningState ) {
+
+ if (t == lasListeningState) {
return false;
}
+
lasListeningState = t;
return true;
}
}
-
- //A new Consumer has been created for the Destination dest
- void addConsumer(Destination dest, SpyMessageConsumer consumer) throws
JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- Log.log("Connection: addConsumer(dest="+dest.toString()+")");
- try {
+ /**
+ * Called whenever a consumer changes his listening state on a destination.
+ * We see if the consumer change, changed the overall listening state for the
destination.
+ * Creation date: (11/16/2000 2:20:22 PM)
+ * @return org.spydermq.distributed.interfaces.DistributedJMSServer
+ */
+ public void listenerChange(Destination d) throws JMSException {
- synchronized (destinations) {
-
- ConsumerSet
consumerSet=(ConsumerSet)destinations.get(dest);
-
- if (consumerSet==null) {
- consumerSet=new ConsumerSet();
- consumerSet.add(consumer);
- HashMap
newDestinations=(HashMap)destinations.clone();
- newDestinations.put(dest,consumerSet);
- destinations=newDestinations;
- provider.subscribe(distributedConnection,dest);
- } else {
- consumerSet.add(consumer);
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
+ ConsumerSet ci = (ConsumerSet) destinations.get(d);
+
+ if( ci == null )
+ return;
+ if (ci.listenStateChanged()) {
+ try {
+ if (ci.getLasListeningState()) {
+
provider.connectionListening(distributedConnection, true, d);
+ } else {
+
provider.connectionListening(distributedConnection, false, d);
}
+ } catch (Exception e) {
+ failureHandler(e, "Cannot contact the JMS server");
}
+ }
+ }
+
+ /**
+ * @return org.spydermq.distributed.interfaces.DistributedJMSServer
+ */
+ SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
+
+ try {
+ return provider.queueReceive(distributedConnection, queue,
wait);
} catch (Exception e) {
- failureHandler(e,"Cannot subscribe to this Destination");
- }
-
+ failureHandler(e, "Cannot create a ConnectionReceiver");
+ return null;
+ }
}
+
+ ////////////////////////////////////////////////////////////////////
+ // Protected
+ ////////////////////////////////////////////////////////////////////
+ //create a new Distributed object which receives the messages for this
connection
+ protected void createReceiver() throws JMSException {
+ try {
+ if (clientID == null)
+ askForAnID();
- //A consumer does not need to recieve the messages from a Destination
- void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException {
-
- if (distributedConnection==null) createReceiver();
-
- Log.log("Connection: removeSession(dest="+dest.toString()+")");
-
+ org.spydermq.distributed.interfaces.ConnectionReceiverSetup cr
=
+
(org.spydermq.distributed.interfaces.ConnectionReceiverSetup)
Class.forName(crClassName).newInstance();
+ cr.setConnection(this);
+ distributedConnection = new SpyDistributedConnection(clientID,
cr);
+ distributedConnection.setConnectionReceiver(cr.createClient());
+
+ provider.setSpyDistributedConnection(distributedConnection);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot create a ConnectionReceiver");
+ }
+ }
+
+ // used to acknowledge a message
+ protected void send(SpyAcknowledgementItem item) throws JMSException {
+ try {
+ provider.acknowledge(distributedConnection, item);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot acknowlege a message.");
+ }
+ }
+
+ //Send a message to the provider
+ void sendToServer(SpyMessage mes) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
+ try {
+
+ provider.addMessage(distributedConnection, mes);
+
+ } catch (Exception e) {
+ failureHandler(e, "Cannot send a message to the JMS provider");
+ }
+ }
+
+ // Used to commit/rollback a transaction.
+ protected void send(TransactionRequest transaction) throws JMSException {
+
+ try {
+ provider.transact(distributedConnection, transaction);
+ } catch (Exception e) {
+ failureHandler(e, "Cannot process a transaction.");
+ }
+
+ }
+
+ //A new Consumer has been created for the Destination dest
+ void addConsumer(Destination dest, SpyConsumer consumer) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
+ Log.log("Connection: addConsumer(dest=" + dest.toString() + ")");
+
try {
-
+
synchronized (destinations) {
-
- ConsumerSet
consumerSet=(ConsumerSet)destinations.get(dest);
-
- if (consumerSet!=null) {
- boolean empty=consumerSet.remove(who);
- if (empty) {
- HashMap
newDestinations=(HashMap)destinations.clone();
- newDestinations.remove(dest);
- destinations=newDestinations;
-
provider.unsubscribe(distributedConnection, dest);
- }
+
+ ConsumerSet consumerSet = (ConsumerSet)
destinations.get(dest);
+
+ if (consumerSet == null) {
+ consumerSet = new ConsumerSet();
+ consumerSet.add(consumer);
+ HashMap newDestinations = (HashMap)
destinations.clone();
+ newDestinations.put(dest, consumerSet);
+ destinations = newDestinations;
+ provider.subscribe(distributedConnection,
dest);
} else {
- //this should not happen
- HashMap
newDestinations=(HashMap)destinations.clone();
- newDestinations.remove(dest);
- destinations=newDestinations;
- provider.unsubscribe(distributedConnection,
dest);
+ consumerSet.add(consumer);
}
-
}
-
+
} catch (Exception e) {
- failureHandler(e,"Cannot unsubscribe to this destination");
+ failureHandler(e, "Cannot subscribe to this Destination");
}
- }
+ }
-
//Gets all the consumers subscribed to a destination
- public SpyMessageConsumer[] getConsumers(Destination dest) throws JMSException
{
-
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
+ public SpyConsumer[] getConsumers(Destination dest) throws JMSException {
+
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
synchronized (destinations) {
- ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);
- if (consumerSet==null || consumerSet.size()==0)
+ ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
+ if (consumerSet == null || consumerSet.size() == 0)
return null;
-
- SpyMessageConsumer rc[]=new
SpyMessageConsumer[consumerSet.size()];
- return (SpyMessageConsumer[])consumerSet.toArray(rc);
+
+ SpyConsumer rc[] = new SpyConsumer[consumerSet.size()];
+ return (SpyConsumer[]) consumerSet.toArray(rc);
}
-
+
}
//Gets the first consumer that is listening to a destination.
- public SpyMessageConsumer pickListeningConsumer(Destination dest) throws
JMSException {
-
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
+ public SpyConsumer pickListeningConsumer(Destination dest) throws JMSException
{
+
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+ if (distributedConnection == null)
+ createReceiver();
+
synchronized (destinations) {
-
- ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);
-
- if (consumerSet==null || consumerSet.size()==0) {
+
+ ConsumerSet consumerSet = (ConsumerSet) destinations.get(dest);
+
+ if (consumerSet == null || consumerSet.size() == 0) {
return null;
- } else {
+ } else {
Iterator i = consumerSet.iterator();
- while( i.hasNext() ) {
- SpyMessageConsumer c =
(SpyMessageConsumer)i.next();
- if( c.isListening() || c.waitInReceive ) {
+ while (i.hasNext()) {
+ SpyConsumer c = (SpyConsumer) i.next();
+ if (c.isListening() || c.isReceiving()) {
return c;
}
}
@@ -488,82 +542,37 @@
}
return null;
-
- }
-
- /**
- * Called whenever a consumer changes his listening state on a destination.
- * We see if the consumer change, changed the overall listening state for the
destination.
- * Creation date: (11/16/2000 2:20:22 PM)
- * @return org.spydermq.distributed.interfaces.DistributedJMSServer
- */
- public void listenerChange(Destination d) throws JMSException {
-
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- ConsumerSet ci=(ConsumerSet)destinations.get(d);
- if( ci.listenStateChanged() ) {
- try {
- if ( ci.getLasListeningState() ) {
-
provider.connectionListening(distributedConnection,true,d);
- } else {
-
provider.connectionListening(distributedConnection,false,d);
- }
- } catch ( Exception e ) {
- failureHandler(e, "Cannot contact the JMS server");
- }
- }
-
- }
-
-
- /**
- * @return org.spydermq.distributed.interfaces.DistributedJMSServer
- */
- SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
-
- try {
- return provider.queueReceive(distributedConnection, queue,
wait);
- } catch (Exception e) {
- failureHandler(e,"Cannot create a ConnectionReceiver");
- return null;
- }
+
}
-
- // used to acknowledge a message
- protected void send(SpyAcknowledgementItem item) throws JMSException
- {
- try {
- provider.acknowledge(distributedConnection, item);
- } catch (Exception e) {
- failureHandler(e,"Cannot acknowlege a message.");
- }
- }
-
- //Send a message to the provider
- void sendToServer(SpyMessage mes) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
+ //A consumer does not need to recieve the messages from a Destination
+ void removeConsumer(Destination dest, SpyConsumer who) throws JMSException {
+
+ if (distributedConnection == null)
+ createReceiver();
+
+ Log.log("Connection: removeSession(dest=" + dest.toString() + ")");
+
try {
+
+ synchronized (destinations) {
+
+ ConsumerSet consumerSet = (ConsumerSet)
destinations.get(dest);
+ if (consumerSet == null)
+ throw new RuntimeException("Destination does
not have any consumers.");
- provider.addMessage(distributedConnection, mes);
-
- } catch (Exception e) {
- failureHandler(e,"Cannot send a message to the JMS provider");
- }
- }
+ consumerSet.remove(who);
- // Used to commit/rollback a transaction.
- protected void send(TransactionRequest transaction) throws JMSException {
-
- try {
- provider.transact(distributedConnection, transaction);
+ if ( consumerSet.isEmpty() ) {
+ destinations.remove(dest);
+ provider.unsubscribe(distributedConnection,
dest);
+ }
+
+ }
+
} catch (Exception e) {
- failureHandler(e,"Cannot process a transaction.");
+ failureHandler(e, "Cannot unsubscribe to this destination");
}
-
+
}
}
1.7 +236 -121 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyMessageConsumer.java 2000/12/19 06:43:34 1.6
+++ SpyMessageConsumer.java 2000/12/21 22:33:55 1.7
@@ -24,12 +24,9 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
-public class SpyMessageConsumer
- implements MessageConsumer
-{
- // Attributes ----------------------------------------------------
+abstract public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
//Link to my session
public SpySession session;
@@ -41,114 +38,208 @@
public Selector selector;
//The message selector
public String messageSelector;
-
//List of Pending messages (not yet delivered)
LinkedList messages;
//Is the consumer sleeping in a receive() ?
boolean waitInReceive;
- public Destination destination;
-
+ //The destination this consumer is getting messages from
+ SpyDestination destination;
//Am I in noLocal mode ?
boolean noLocal;
-
+
// Constructor ---------------------------------------------------
-
- SpyMessageConsumer(SpySession s)
- {
- session=s;
- messageListener=null;
- closed=false;
- selector=null;
- messageSelector=null;
- messages=new LinkedList();
- waitInReceive=false;
- }
-
-
-
+
+ SpyMessageConsumer(SpySession s, SpyDestination dest) {
+ session = s;
+ destination = dest;
+ messageListener = null;
+ closed = false;
+ selector = null;
+ messageSelector = null;
+ messages = new LinkedList();
+ waitInReceive = false;
+ }
+
// Public --------------------------------------------------------
+
+ public String getMessageSelector() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The MessageConsumer is
closed");
- public String getMessageSelector() throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
return messageSelector;
}
- public MessageListener getMessageListener() throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
+ public MessageListener getMessageListener() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The MessageConsumer is
closed");
+
return messageListener;
}
+
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The MessageConsumer is
closed");
+ if (waitInReceive)
+ throw new JMSException("This MessageConsumer is waiting in
receive() !");
+
+ messageListener = listener;
- public void setMessageListener(MessageListener listener) throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- if (waitInReceive) throw new JMSException("This MessageConsumer is
waiting in receive() !");
- //Job is done in the inherited classes
- //The QueueReceiver object need to notify their session / connection /
the broker
- }
-
- public Message receive() throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- if (messageListener!=null) throw new JMSException("A message listener
is already registered");
- //Job is done in the inherited classes
- //The QueueReceiver object need to notify their session / connection /
the broker
- return null;
- }
-
- public Message receive(long timeOut) throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- if (messageListener!=null) throw new JMSException("A message listener
is already registered");
- //Job is done in the inherited classes
- //The QueueReceiver object need to notify their session / connection /
the broker
- return null;
- }
-
- public Message receiveNoWait() throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- if (messageListener!=null) throw new JMSException("A message listener
is already registered");
- //Job is done in the inherited classes
- //The QueueReceiver object need to notify their session / connection /
the broker
- return null;
- }
-
- public synchronized void close() throws JMSException
- {
- //Job is done in the inherited classes
- //The QueueReceiver object need to notify their session / connection /
the broker
- throw new RuntimeException("pure virtual call");
+ //session.run();
}
-
+
+ public Message receive() throws JMSException {
+
+ if (closed)
+ throw new IllegalStateException("The MessageConsumer is
closed");
+ if (messageListener != null)
+ throw new JMSException("A message listener is already
registered");
+ if (destination == null)
+ throw new JMSException("No assigned destination.");
+
+ waitInReceive = true;
+
+ if (!isListening() && this instanceof SpyQueueReceiver)
+ session.connection.queueReceive((SpyQueue) destination, 0);
+
+ synchronized (messages) {
+
+ try {
+ while (true) {
+ if (closed)
+ return null;
+ if (!session.modeStop) {
+ Message mes = getMessage();
+ if (mes != null)
+ return mes;
+ } else
+ Log.notice("the connection is stopped
!");
+
+ Log.log("SpyMessageConsumer: receive in
messages.wait()");
+ messages.wait();
+ }
+ } catch (InterruptedException e) {
+ JMSException newE = new JMSException("Receive
interupted");
+ newE.setLinkedException(e);
+ throw newE;
+ } finally {
+ waitInReceive = false;
+ }
+ }
+
+ }
+
+ public Message receive(long timeOut) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The MessageConsumer is
closed");
+ if (messageListener != null)
+ throw new JMSException("A message listener is already
registered");
+ if (destination == null)
+ throw new JMSException("No assigned destination.");
+
+ if (timeOut == 0)
+ return receive();
+
+ long endTime = System.currentTimeMillis() + timeOut;
+
+ waitInReceive = true;
+
+ if (!isListening() && this instanceof SpyQueueReceiver)
+ session.connection.queueReceive((SpyQueue) destination,
timeOut);
+
+ synchronized (messages) {
+
+ try {
+
+ while (true) {
+
+ if (closed)
+ return null;
+
+ if (!session.modeStop) {
+ Message mes = getMessage();
+ if (mes != null) {
+ return mes;
+ }
+
+ } else
+ Log.log("the connection is stopped !");
+
+ long att = endTime -
System.currentTimeMillis();
+ if (att <= 0) {
+ return null;
+ }
+
+ messages.wait(att);
+ }
+
+ } catch (InterruptedException e) {
+ JMSException newE = new JMSException("Receive
interupted");
+ newE.setLinkedException(e);
+ throw newE;
+ } finally {
+ waitInReceive = false;
+ }
+ }
+
+ }
+
+ public Message receiveNoWait() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The MessageConsumer is
closed");
+ if (messageListener != null)
+ throw new JMSException("A message listener is already
registered");
+ if (destination == null)
+ throw new JMSException("No assigned destination.");
+
+ waitInReceive = true;
+ try {
+
+ if (!isListening() && this instanceof SpyQueueReceiver) {
+
+ if (session.modeStop)
+ return null;
+ return session.connection.queueReceive((SpyQueue)
destination, -1);
+ }
+
+ synchronized (messages) {
+ while (true) {
+ if (session.modeStop)
+ return null;
+ return getMessage();
+ }
+ }
+
+ } finally {
+ waitInReceive = false;
+ }
+ }
+
+ abstract public void close() throws JMSException;
+
//Package protected - Not part of the spec
-
- void setSelector(Selector selector,String messageSelector)
- {
- this.selector=selector;
- this.messageSelector=messageSelector;
- }
-
- SpyMessage getMessage()
- {
+
+ void setSelector(Selector selector, String messageSelector) {
+ this.selector = selector;
+ this.messageSelector = messageSelector;
+ }
+
+ SpyMessage getMessage() {
synchronized (messages) {
-
+
while (true) {
try {
- if (messages.size()==0) return null;
-
- SpyMessage
mes=(SpyMessage)messages.removeFirst();
-
+ if (messages.size() == 0)
+ return null;
+
+ SpyMessage mes = (SpyMessage)
messages.removeFirst();
+
if (mes.isOutdated()) {
Log.notice("SessionQueue: I dropped a
message (timeout)");
continue;
}
-
- if (selector!=null) {
+
+ if (selector != null) {
if (!selector.test(mes)) {
Log.log("SessionQueue: I
dropped a message (selector)");
continue;
@@ -156,76 +247,100 @@
Log.log("SessionQueue:
selector evaluates TRUE");
}
}
-
+
if (noLocal &&
mes.producerClientId.equals(session.connection.clientID)) {
Log.notice("SessionQueue: I dropped a
message (noLocal)");
continue;
}
-
+
//the SAME Message object is put in different
SessionQueues
//when we deliver it, we have to clone() it to
insure independance
- SpyMessage message=mes.myClone();
+ SpyMessage message = mes.myClone();
message.setSpySession(session);
- if( session.transacted ) {
+ if (session.transacted) {
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId,
message);
- } else if(
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE ||
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
+ } else if (session.acknowledgeMode ==
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
message.doAcknowledge();
}
return message;
-
+
} catch (Exception e) {
e.printStackTrace();
}
}
-
+
}
}
-
- public void addMessage(SpyMessage mes) throws JMSException
- {
- //Set the session in the message so it can acknowlege
- mes.setSpySession(session);
-
+
+ public void addMessage(SpyMessage mes) throws JMSException {
synchronized (messages) {
//Add a message to the queue
- messages.addLast(mes);
+ messages.addLast(mes);
}
}
-
-
+
public boolean deliverMessage() throws JMSException {
-
+
synchronized (messages) {
- if (messages.size()==0)
+ if (messages.size() == 0)
return false;
-
- if (messageListener==null) {
- if (!waitInReceive)
+
+ if (messageListener == null) {
+ if (!waitInReceive) {
+
+ // If no Listener and No reciver is waiting
for a message
+ // Then we neg ack the message back to the
server in the queue case.
+ if (this instanceof SpyQueueReceiver) {
+
+ SpyMessage mes = getMessage();
+ while (mes == null) {
+
+ Log.log("Got unrequested
message, sending NACK for: " + mes);
+ SpyAcknowledgementItem item =
new SpyAcknowledgementItem();
+ item.jmsDestination =
mes.getJMSDestination();
+ item.jmsMessageID =
mes.getJMSMessageID();
+ item.isAck = false;
+
+ session.connection.send(item);
+
+ mes = getMessage();
+ }
+
+ }
return false;
+ }
messages.notify();
+
} else {
- SpyMessage mes=getMessage();
- if (mes==null)
+
+ SpyMessage mes = getMessage();
+ if (mes == null)
return false;
-
+
messageListener.onMessage(mes);
- if( session.transacted ) {
+ if (session.transacted) {
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
- } else if(
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE ||
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
+ } else if (session.acknowledgeMode ==
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
mes.doAcknowledge();
- }
-
- }
- }
+ }
+
+ }
+ }
+
return true;
- }
+ }
+
+ abstract public boolean isListening();
+
+ public boolean isReceiving() {
+ return waitInReceive;
+ }
-
- public boolean isListening() {
- return false;
+ public void processMessages() throws JMSException {
+ session.mutex.notifyLock();
}
}
1.3 +4 -3 spyderMQ/src/java/org/spydermq/SpyQueueConnection.java
Index: SpyQueueConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueConnection.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueueConnection.java 2000/12/12 05:58:58 1.2
+++ SpyQueueConnection.java 2000/12/21 22:33:55 1.3
@@ -13,7 +13,9 @@
import javax.jms.ServerSessionPool;
import javax.jms.TemporaryQueue;
import javax.jms.Queue;
+
import java.io.Serializable;
+
import org.spydermq.distributed.interfaces.DistributedJMSServer;
/**
@@ -22,7 +24,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueueConnection
extends SpyConnection
@@ -60,8 +62,7 @@
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
- //Not impelemted yet
- return null;
+ return new SpyConnectionConsumer(this, queue, messageSelector,
sessionPool, maxMessages);
}
1.7 +33 -170 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyQueueReceiver.java 2000/12/19 06:43:34 1.6
+++ SpyQueueReceiver.java 2000/12/21 22:33:55 1.7
@@ -18,204 +18,67 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
-public class SpyQueueReceiver
- extends SpyMessageConsumer
- implements QueueReceiver
-{
+public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
// Attributes ----------------------------------------------------
//The queue I registered
private Queue queue;
//Mode of this QueueReceiver
boolean listening;
-
+
// Constructor ---------------------------------------------------
-
- SpyQueueReceiver(SpyQueueSession session,Queue queue)
- {
- super(session);
- this.destination=queue;
- this.queue=queue;
- listening=false;
+
+ SpyQueueReceiver(SpyQueueSession session, Queue queue) {
+ super(session, (SpyQueue) queue);
+ this.queue = queue;
+ listening = false;
}
// Public --------------------------------------------------------
+
+ public Queue getQueue() throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The MessageConsumer is
closed");
- public Queue getQueue() throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
return queue;
}
- public void close() throws JMSException
- {
- synchronized (messages) {
- if (closed) return;
- closed=true;
+ public void close() throws JMSException {
- setListening(false);
- }
- }
+ synchronized (messages) {
+ if (closed)
+ return;
- //Overrides MessageConsumer
- public Message receive() throws JMSException {
- super.receive();
+ if (queue != null)
+ session.removeConsumer(queue, this);
+ setListening(false);
- //if the client follows the specification [4.4.6], he cannot use this
session
- //to asynchronously receive a message or receive() in another thread.
- //If a message is already pending for this session, we can immediatly
deliver it
- synchronized (messages) {
-
- waitInReceive = true;
- session.connection.queueReceive(queue, 0);
-
- try {
-
- while (true) {
-
- if (!session.modeStop) {
-
- Message mes = getMessage();
- if (mes != null)
- return mes;
-
- } else
- Log.log("the connection is stopped !");
-
- messages.wait();
- }
-
- } catch (InterruptedException e) {
- JMSException newE = new JMSException("Receive
interupted");
- newE.setLinkedException(e);
- throw newE;
- } finally {
- waitInReceive=false;
- }
- }
- }
+ if (waitInReceive && messageListener == null) {
+ //A consumer could be waiting in receive()
+ messages.notify();
+ }
- public Message receive(long timeOut) throws JMSException
- {
- super.receive(timeOut);
-
- if (timeOut==0) return receive();
- long endTime=System.currentTimeMillis()+timeOut;
-
-
- //if the client respects the specification [4.4.6], he cannot use this
session
- //to asynchronously receive a message or receive() from another thread.
- //If a message is already pending for this session, we can deliver it
- synchronized (messages) {
-
- waitInReceive=true;
- session.connection.queueReceive(queue,timeOut);
-
- try {
-
- while (true) {
-
- if (!session.modeStop) {
- Message mes=getMessage();
- if (mes!=null) {
- return mes;
- }
-
- } else
- Log.log("the connection is stopped !");
-
- long att=endTime-System.currentTimeMillis();
- if (att<=0) {
- return null;
- }
-
- messages.wait(att);
- }
-
- } catch (InterruptedException e) {
- JMSException newE = new JMSException("Receive
interupted");
- newE.setLinkedException(e);
- throw newE;
- } finally {
- waitInReceive=false;
- }
+ closed = true;
}
-
}
-
- public Message receiveNoWait() throws JMSException
- {
- super.receiveNoWait();
- if (session.modeStop) return null;
- return session.connection.queueReceive(queue,-1);
- }
- public void setMessageListener(MessageListener listener) throws JMSException
- {
+ public void setMessageListener(MessageListener listener) throws JMSException {
super.setMessageListener(listener);
-
- messageListener=listener;
- setListening(listener!=null);
+ setListening(listener != null);
}
-
- //---
- void setListening(boolean newvalue) throws JMSException
- {
- if (newvalue==listening) return;
- listening=newvalue;
-
- session.getConnection().listenerChange(queue);
- }
-
- //Called by the ConnectionReceiver which has just received a message - in the
Queue case only
- public void dispatchMessage(SpyMessage mes) throws JMSException {
- if (session.closed)
- throw new NoReceiverException("The session is closed");
- if (session.modeStop)
- throw new NoReceiverException("The session is stopped");
- if (mes.isOutdated())
+ //---
+ void setListening(boolean newvalue) throws JMSException {
+ if (newvalue == listening)
return;
-
- //Work with this receiver
- if (messageListener == null) {
- synchronized (messages) {
-
- if ( waitInReceive ) {
- if( messages.size()==0 ) {
- addMessage(mes);
- messages.notify();
- } else {
- Log.notice("Got too many messages for
one receive.!");
- throw new NoReceiverException("Got too
many messages for one receive.!");
- }
- } else {
- Log.notice("Message did not arrive in time for
the receive!");
- throw new NoReceiverException("Message did not
arrive in time for the receive!");
- }
-
- }
- } else {
-
- if (!isListening())
- throw new NoReceiverException("The receiver is not
longer listening!");
-
- //Set the session in the message so it can acknowlege
- mes.setSpySession(session);
- messageListener.onMessage(mes);
-
- if( session.transacted ) {
-
session.connection.spyXAResourceManager.ackMessage(session.currentTransactionId, mes);
- } else if( session.acknowledgeMode==session.AUTO_ACKNOWLEDGE
|| session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
- mes.doAcknowledge();
- }
- }
-
- }
+ listening = newvalue;
+
+ if (queue != null)
+ session.getConnection().listenerChange(queue);
+ }
public boolean isListening() {
return listening;
1.9 +9 -1 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- SpyQueueSession.java 2000/12/19 06:43:33 1.8
+++ SpyQueueSession.java 2000/12/21 22:33:55 1.9
@@ -22,19 +22,20 @@
import java.util.Iterator;
+import javax.jms.MessageListener;
+
/**
* This class implements javax.jms.QueueSession and javax.jms.XAQueueSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class SpyQueueSession
extends SpySession
implements QueueSession, XAQueueSession
{
-
// Constructor ---------------------------------------------------
SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop, boolean xaSession)
@@ -111,4 +112,11 @@
return this;
}
+ public void setMessageListener(MessageListener listener) throws JMSException
+ {
+
+ super.setMessageListener(listener);
+ sessionConsumer = new SpyQueueReceiver(this, null);
+
+ }
}
1.16 +119 -128 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- SpySession.java 2000/12/19 06:43:33 1.15
+++ SpySession.java 2000/12/21 22:33:55 1.16
@@ -34,9 +34,9 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.15 $
+ * @version $Revision: 1.16 $
*/
-public class SpySession
+abstract public class SpySession
implements Runnable, Session, XASession
{
// Attributes ----------------------------------------------------
@@ -49,16 +49,21 @@
private MessageListener messageListener;
//The connection object to which this session is linked
protected SpyConnection connection;
+ // This consumer is the consumer that receives messages for the MessageListener
+ // assigned to the session. The SpyConnectionConsumer delivers messages to him
+ SpyMessageConsumer sessionConsumer;
+ //MessageConsumers created by this session
+ protected HashSet consumers;
//Is my connection in stopped mode ?
protected boolean modeStop;
//Is the session closed ?
boolean closed;
- //This object is the object used to synchronize the session's thread - Need
fixed / improvement
- public Mutex mutex;
- //MessageConsumers created by this session
- protected HashSet consumers;
-
+ // Used to notify the session thread to deliver messages
+ Mutex mutex;
+ // Used to lock the run() method
+ Object runLock=new Object();
+
//The transctionId of the current transaction (registed with the
SpyXAResourceManager)
Object currentTransactionId;
// If this is an XASession, we have an associated XAResource
@@ -76,26 +81,34 @@
if( xaSession )
spyXAResource = new SpyXAResource(this);
+ mutex = new Mutex();
messageListener=null;
closed=false;
- mutex=new Mutex();
consumers = new HashSet();
-
- //Start my thread
- Thread oneThread=new Thread(this, "SpySession");
- oneThread.setDaemon(true);
- oneThread.start();
-
- //Wait for the thread to sleep
- mutex.waitLocked();
-
+
//Have a TX ready with the resource manager.
if( spyXAResource==null && transacted )
currentTransactionId =
connection.spyXAResourceManager.startTx();
+ //Start my thread
+ Thread oneThread=new Thread("SpySession") {
+ public void run() {
+ mutex.acquireLock();
+ while( !closed ) {
+ SpySession.this.run();
+ mutex.waitLock();
+ }
+ mutex.releaseLock();
+ }
+ };
+ oneThread.setDaemon(true);
+ oneThread.start();
+
+ mutex.waitLocked();
}
+
// Public --------------------------------------------------------
public BytesMessage createBytesMessage() throws JMSException
@@ -189,46 +202,62 @@
public void setMessageListener(MessageListener listener) throws JMSException
{
- if (closed) throw new IllegalStateException("The session is closed");
-
- messageListener=listener;
+ if (closed) throw new IllegalStateException("The session is closed");
+ messageListener = listener;
+
+ mutex.notifyLock();
}
- //The thread for this session. It sends outgoing messages and delivers
incoming ones
+ // Delivers incoming messages in this session
public void run()
- {
- mutex.acquireLock();
-
- while (true) {
+ {
+ synchronized (runLock) {
+
+ Log.log("SpySession: Message delivery started");
+
+ boolean done=false;
+ while (!done) {
+ if (closed) break;
- boolean doneJob=false;
- if (closed) break;
-
- try {
- //if we are not in stopped mode, look at the incoming
queue
- if (!modeStop) {
- Iterator i=consumers.iterator();
- while (i.hasNext()) {
- SpyMessageConsumer
mc=(SpyMessageConsumer)i.next();
- doneJob=doneJob||mc.deliverMessage();
- }
+ // If we can deliver any messages, then we are not
done.
+ done = true;
+
+ try {
+
+ //if we are not in stopped mode, look at the
incoming queue
+ //Consisder if should be stopped because we
are outside the XA transaction (start/end)
+ boolean xaStop = spyXAResource!=null &&
currentTransactionId==null;
+ if (!(modeStop || xaStop)) {
+ Iterator i;
+
+ synchronized (consumers) {
+ i =
((HashSet)consumers.clone()).iterator();
+ }
+
+ while (i.hasNext()) {
+ SpyMessageConsumer
mc=(SpyMessageConsumer)i.next();
+ if( mc.deliverMessage() )
+ done = false;
+ }
+
+ synchronized (consumers) {
+ if( sessionConsumer != null ) {
+ if(
sessionConsumer.deliverMessage() )
+ done = false;
+ }
+ }
+
+ }
+ } catch (JMSException e) {
+ Log.log("Cannot receive a message from the
provider...");
+ Log.error(e);
}
- } catch (JMSException e) {
- Log.log("Cannot receive a message from the
provider...");
- Log.error(e);
+
}
-
- //If there was smthg to do, try again
- if (doneJob) continue;
-
- //Log.log("SessionThread: I'm going to bed...");
- mutex.waitLock();
- //Log.log("SessionThread: I wake up");
-
+
}
-
- mutex.releaseLock();
+ Log.log("SpySession: Message delivery ended");
}
public synchronized void close() throws JMSException
@@ -236,20 +265,24 @@
// allow other threads to process before closing this session
// Patch submitted by John Ellis (10/29/00)
Thread.yield();
-
- if (closed) return;
- closed=true;
- //if the thread is sleeping, kill it
- mutex.notifyLock();
- mutex.waitToSleep();
-
- //notify the sleeping synchronous listeners
-
- Iterator i=consumers.iterator();
- while (i.hasNext()) {
- SpyMessageConsumer
messageConsumer=(SpyMessageConsumer)i.next();
- messageConsumer.close();
+ synchronized (runLock) {
+ if (closed) return;
+ closed=true;
+ }
+
+ synchronized (consumers) {
+
+ //notify the sleeping synchronous listeners
+ if ( sessionConsumer != null )
+ sessionConsumer.close();
+
+ Iterator i=consumers.iterator();
+ while (i.hasNext()) {
+ SpyMessageConsumer
messageConsumer=(SpyMessageConsumer)i.next();
+ messageConsumer.close();
+ }
+
}
connection.sessionClosing(this);
@@ -263,17 +296,12 @@
if (spyXAResource!=null) throw new
javax.jms.TransactionInProgressException("Should not be call from a XASession");
if (closed) throw new IllegalStateException("The session is closed");
if (!transacted) throw new IllegalStateException("The session is not
transacted");
-
Log.log("Session: commit()");
- boolean modeSav=modeStop;
- modeStop=true;
-
- //Wait for the thread to sleep
- synchronized (mutex) {
- mutex.waitToSleep();
-
+ //Don't deliver any more messages while commiting
+ synchronized (runLock) {
+
// commit transaction with onePhase commit
try {
connection.spyXAResourceManager.commit(currentTransactionId, true);
@@ -288,9 +316,6 @@
} catch ( Exception ignore ) {}
}
- //We have finished our work, we can wake up the thread
- modeStop=modeSav;
- mutex.notifyLock();
}
}
@@ -303,15 +328,10 @@
if (!transacted) throw new IllegalStateException("The session is not
transacted");
Log.log("Session: rollback()");
-
- boolean modeSav=modeStop;
- modeStop=true;
-
- //Wait for the thread to sleep
- synchronized (mutex) {
+
+ // Stop message delivery
+ synchronized (runLock) {
- mutex.waitToSleep();
-
// rollback transaction
try {
connection.spyXAResourceManager.rollback(currentTransactionId);
@@ -324,11 +344,8 @@
try {
currentTransactionId =
connection.spyXAResourceManager.startTx();
} catch ( Exception ignore ) {}
- }
-
- //We have finished our work, we can wake up the thread
- modeStop=modeSav;
- mutex.notifyLock();
+ }
+
}
}
@@ -365,27 +382,17 @@
return connection.getNewMessageID();
}
- //The connection has changed its mode (stop() or start())
- //We have to wait until message delivery has stopped or wake up the thread
- void notifyStopMode(boolean newValue)
- {
-
- if (closed) throw new IllegalStateException("The session is closed");
- if (modeStop==newValue) return;
-
- modeStop=newValue;
- notifyStopMode();
-
- }
void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
{
Log.log("Session:
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
+
+ synchronized (connection) {
+ connection.removeConsumer(dest, who );
+ }
consumers.remove( who );
-
- connection.removeConsumer(dest, who );
}
void addConsumer(Destination dest, SpyMessageConsumer who) throws JMSException
@@ -393,22 +400,20 @@
if (closed) throw new IllegalStateException("The session is closed");
Log.log("Session:
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
- connection.addConsumer(dest, who);
+
+ synchronized (consumers) {
+ consumers.add( who );
+ }
- consumers.add( who );
+ connection.addConsumer(dest, who);
}
- /**
- * @return org.spydermq.SpyConnection
- */
public SpyConnection getConnection() {
return connection;
}
-
-
//called by a MessageProducer object which needs to publish a message
void sendMessage(SpyMessage m) throws JMSException {
@@ -423,35 +428,21 @@
}
-
- /**
- * getXAResource method comment.
- */
public javax.transaction.xa.XAResource getXAResource() {
- return null;
+ return spyXAResource;
}
- // The XA transaction state may have changed... either it started or ended
- // We have to wait until message delivery has stopped or wake up the thread
- void notifyStopMode()
+ //The connection has changed its mode (stop() or start())
+ //We have to wait until message delivery has stopped or wake up the thread
+ void setStopMode(boolean newValue)
{
- // Should it be stopped because we are outside the XA transaction
(start/end)
- boolean xaStop = spyXAResource!=null && currentTransactionId==null;
+ if (closed) throw new IllegalStateException("The session is closed");
+ if (modeStop==newValue) return;
- if ( modeStop || xaStop ) {
-
- //Wait for the thread to sleep
- mutex.waitToSleep();
-
- } else {
-
- //Wake up the thread
- mutex.notifyLock();
-
- }
+ modeStop=newValue;
+ mutex.notifyLock();
}
-
}
1.11 +10 -0 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- SpyTopicSession.java 2000/12/19 06:43:33 1.10
+++ SpyTopicSession.java 2000/12/21 22:33:55 1.11
@@ -27,13 +27,15 @@
import org.spydermq.Log;
+import javax.jms.MessageListener;
+
/**
* This class implements javax.jms.TopicSession and javax.jms.XATopicSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public class SpyTopicSession
extends SpySession
@@ -122,5 +124,13 @@
*/
public javax.jms.TopicSession getTopicSession() throws javax.jms.JMSException {
return this;
+ }
+
+ public void setMessageListener(MessageListener listener) throws JMSException
+ {
+
+ super.setMessageListener(listener);
+ sessionConsumer = new SpyTopicSubscriber(this, null, false);
+
}
}
1.8 +17 -104 spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyTopicSubscriber.java 2000/12/13 15:59:10 1.7
+++ SpyTopicSubscriber.java 2000/12/21 22:33:56 1.8
@@ -20,7 +20,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -36,8 +36,7 @@
SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal)
{
- super(session);
- destination=topic;
+ super(session, (SpyTopic)topic);
this.topic=topic;
this.noLocal=noLocal;
}
@@ -59,115 +58,29 @@
//Overrides MessageConsumer
public void close() throws JMSException
- {
- if (closed) return;
- closed=true;
-
- session.removeConsumer(topic,this);
-
- if (waitInReceive&&messageListener==null) {
-
- //A consumer could be waiting in receive()
- synchronized (messages) {
- messages.notify();
- }
-
- }
- }
-
- public Message receive() throws JMSException
{
- super.receive();
-
- synchronized (messages) {
-
- //if the client follows the specification [4.4.6], he cannot
use this session
- //to asynchronously receive a message or receive() in another
thread.
- //If a message is already pending for this session, we can
immediatly deliver it
-
- while (true) {
-
- if (closed) return null;
-
- if (!session.modeStop) {
- Message mes=getMessage();
- if (mes!=null) return mes;
- } else Log.notice("the connection is stopped !");
-
- try {
- waitInReceive=true;
- messages.wait();
- } catch (InterruptedException e) {
- } finally {
- waitInReceive=false;
- }
-
- }
- }
- }
- public Message receive(long timeOut) throws JMSException
- {
- super.receive(timeOut);
-
- if (timeOut==0) return receive();
-
- long endTime=System.currentTimeMillis()+timeOut;
-
synchronized (messages) {
-
- //if the client respects the specification [4.4.6], he cannot
use this session
- //to asynchronously receive a message or receive() from
another thread.
- //If a message is already pending for this session, we can
deliver it
+ if (closed)
+ return;
+
+ if (topic != null)
+ session.removeConsumer(topic, this);
- while (true) {
-
- if (closed) return null;
-
- if (!session.modeStop) {
- Message mes=getMessage();
- if (mes!=null) return mes;
- } else Log.notice("the connection is stopped !");
-
- long att=endTime-System.currentTimeMillis();
- if (att<=0) return null;
-
- try {
- waitInReceive=true;
- messages.wait(att);
- } catch (InterruptedException e) {
- } finally {
- waitInReceive=false;
- }
-
+ if (waitInReceive && messageListener == null) {
+ //A consumer could be waiting in receive()
+ messages.notify();
}
- }
-
- }
-
- public Message receiveNoWait() throws JMSException
- {
- super.receiveNoWait();
-
- synchronized (messages) {
- while (true) {
- if (session.modeStop) return null;
- return getMessage();
- }
-
+ closed = true;
}
- }
-
- public void setMessageListener(MessageListener listener) throws JMSException
- {
- super.setMessageListener(listener);
-
- messageListener=listener;
- //Signal the change to the session thread ( it could sleep, while
there are messages for him )
- session.mutex.notifyLock();
-
}
+ /**
+ * A topic is allways accepting messages from a destination.
+ */
+ public boolean isListening() {
+ return true;
+ }
}
1.2 +45 -36 spyderMQ/src/java/org/spydermq/SpyXAResource.java
Index: SpyXAResource.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyXAResource.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyXAResource.java 2000/12/19 06:43:32 1.1
+++ SpyXAResource.java 2000/12/21 22:33:56 1.2
@@ -16,10 +16,10 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyXAResource implements XAResource {
-
+
//////////////////////////////////////////////////////////////////
// Attributes
//////////////////////////////////////////////////////////////////
@@ -37,7 +37,7 @@
//////////////////////////////////////////////////////////////////
// Public Methods
//////////////////////////////////////////////////////////////////
-
+
/**
* commit method comment.
*/
@@ -48,7 +48,7 @@
throw new XAException(XAException.XAER_RMERR);
}
}
-
+
/**
* end method comment.
*/
@@ -56,36 +56,39 @@
if (session.currentTransactionId == null) {
throw new XAException(XAException.XAER_OUTSIDE);
}
- switch (flags) {
- case TMSUSPEND :
- session.currentTransactionId = null;
- session.connection.spyXAResourceManager.suspendTx(xid);
- break;
- case TMFAIL :
- session.currentTransactionId = null;
- session.connection.spyXAResourceManager.endTx(xid,
false);
- break;
- case TMSUCCESS :
- session.currentTransactionId = null;
- session.connection.spyXAResourceManager.endTx(xid,
true);
- break;
+
+ synchronized (session.runLock) {
+
+ switch (flags) {
+ case TMSUSPEND :
+ session.currentTransactionId = null;
+
session.connection.spyXAResourceManager.suspendTx(xid);
+ break;
+ case TMFAIL :
+ session.currentTransactionId = null;
+
session.connection.spyXAResourceManager.endTx(xid, false);
+ break;
+ case TMSUCCESS :
+ session.currentTransactionId = null;
+
session.connection.spyXAResourceManager.endTx(xid, true);
+ break;
+ }
}
- session.notifyStopMode();
}
-
+
/**
* forget method comment.
*/
public void forget(javax.transaction.xa.Xid arg1) throws
javax.transaction.xa.XAException {
}
-
+
/**
* getTransactionTimeout method comment.
*/
public int getTransactionTimeout() throws javax.transaction.xa.XAException {
return 0;
}
-
+
/**
* isSameRM method comment.
*/
@@ -94,7 +97,7 @@
return false;
return ((SpyXAResource) arg1).session.connection.spyXAResourceManager
== session.connection.spyXAResourceManager;
}
-
+
/**
* prepare method comment.
*/
@@ -105,14 +108,14 @@
throw new XAException(XAException.XAER_RMERR);
}
}
-
+
/**
* recover method comment.
*/
public Xid[] recover(int arg1) throws javax.transaction.xa.XAException {
return new Xid[0];
}
-
+
/**
* rollback method comment.
*/
@@ -123,14 +126,14 @@
throw new XAException(XAException.XAER_RMERR);
}
}
-
+
/**
* setTransactionTimeout method comment.
*/
public boolean setTransactionTimeout(int arg1) throws
javax.transaction.xa.XAException {
return false;
}
-
+
/**
* start method comment.
*/
@@ -138,17 +141,23 @@
if (session.currentTransactionId != null) {
throw new XAException(XAException.XAER_OUTSIDE);
}
- switch (flags) {
- case TMNOFLAGS :
- session.currentTransactionId =
session.connection.spyXAResourceManager.startTx(xid);
- break;
- case TMJOIN :
- session.currentTransactionId =
session.connection.spyXAResourceManager.joinTx(xid);
- break;
- case TMRESUME :
- session.currentTransactionId =
session.connection.spyXAResourceManager.resumeTx(xid);
- break;
+
+ synchronized (session.runLock) {
+
+ switch (flags) {
+ case TMNOFLAGS :
+ session.currentTransactionId =
session.connection.spyXAResourceManager.startTx(xid);
+ break;
+ case TMJOIN :
+ session.currentTransactionId =
session.connection.spyXAResourceManager.joinTx(xid);
+ break;
+ case TMRESUME :
+ session.currentTransactionId =
session.connection.spyXAResourceManager.resumeTx(xid);
+ break;
+ }
+ session.runLock.notify();
+
}
- session.notifyStopMode();
+
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java
Index: SpyConnectionConsumer.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.ServerSessionPool;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import java.util.Iterator;
import java.util.LinkedList;
/**
* This class implements javax.jms.ConnectionConsumer
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer,
SpyConsumer {
// The connection is the consumer was created with
SpyConnection connection;
// The destination this consumer will receive messages from
Destination destination;
// The ServerSessionPool that is implemented by the AS
javax.jms.ServerSessionPool serverSessionPool;
// The maximum number of messages that a single session will be loaded with.
int maxMessages;
// This queue will hold messages until they are dispatched to the
MessageListener
LinkedList queue = new LinkedList();
// Is the ConnectionConsumer closed?
boolean closed;
/**
* SpyConnectionConsumer constructor comment.
*/
public SpyConnectionConsumer(SpyConnection connection, Destination
destination, String messageSelector, ServerSessionPool serverSessionPool, int
maxMessages)
throws JMSException {
this.connection = connection;
this.destination = destination;
this.serverSessionPool = serverSessionPool;
this.maxMessages = maxMessages;
connection.addConsumer(destination, this);
}
public void addMessage(SpyMessage mes) throws JMSException {
queue.addLast(mes);
}
/**
* close method comment.
*/
public void close() throws javax.jms.JMSException {
connection.removeConsumer(destination, this);
closed = true;
}
/**
* getServerSessionPool method comment.
*/
public javax.jms.ServerSessionPool getServerSessionPool() throws
javax.jms.JMSException {
return serverSessionPool;
}
public boolean isListening() {
return true;
}
public boolean isReceiving() {
return false;
}
public void processMessages() throws JMSException {
ServerSession serverSession = serverSessionPool.getServerSession();
SpySession spySession = (SpySession) serverSession.getSession();
if (spySession.sessionConsumer == null)
throw new JMSException("Session did not have a set
MessageListner");
int loadCounter = 0;
Iterator iter = queue.iterator();
while (iter.hasNext()) {
loadCounter++;
SpyMessage message = (SpyMessage) iter.next();
spySession.sessionConsumer.addMessage(message);
if (loadCounter >= maxMessages) {
serverSession.start();
serverSession = serverSessionPool.getServerSession();
spySession = (SpySession) serverSession.getSession();
if (spySession.sessionConsumer == null)
throw new JMSException("Session did not have a
set MessageListner");
}
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyConsumer.java
Index: SpyConsumer.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.JMSException;
/**
* This class defines the interface which is used by the ConnectionReceiver to
* send messages to the consumers.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface SpyConsumer
{
// A ConnectionReceiver uses this method to load a Consumer with a message
public void addMessage(SpyMessage mes) throws JMSException;
// This is used the Connection class (it maintains a list of consumers) to see
who is receiving messages
public boolean isListening();
// This is used the Connection class (it maintains a list of consumers) to see
who is receiving messages
public boolean isReceiving();
// This is called by a ConnectionReceiver after it is finished loading
messages into the consumer.
public void processMessages() throws JMSException;
}