User: norbert
Date: 00/05/18 13:21:05
Modified: src/java/org/spyderMQ JMSServer.java JMSServerQueue.java
SessionQueue.java SpyMessageConsumer.java
SpyQueueSession.java SpySession.java
SpyTopicPublisher.java SpyTopicSession.java
SpyTopicSubscriber.java
Log:
second step for queues
Revision Changes Path
1.29 +8 -25 spyderMQ/src/java/org/spyderMQ/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -r1.28 -r1.29
--- JMSServer.java 2000/05/18 00:16:09 1.28
+++ JMSServer.java 2000/05/18 20:20:56 1.29
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.28 $
+ * @version $Revision: 1.29 $
*/
public class JMSServer
implements Runnable
@@ -83,8 +83,9 @@
synchronized (taskQueue)
{
while (queue==null) {
-
- int size=taskQueue.size(); // size() is O(1)
in LinkedList...
+
+ // size() is O(1) in LinkedList...
+ int size=taskQueue.size();
if (size!=0) {
queue=(JMSServerQueue)taskQueue.removeFirst();
//One other thread can start working
on the task queue...
@@ -99,28 +100,10 @@
}
}
- }
-
- //Clear the message queue for this destination
- SpyMessage[] msgs=queue.startWork();
-
- //Let the thread do its work
- if ((queue.destination instanceof Topic)&&msgs.length>1) {
- //We can send multiple messages
- Log.log(" Send Msgs[1.."+msgs.length+"]");
- queue.sendMultipleMessages(msgs);
- } else {
- //Send each message
- for(int i=0;i<msgs.length;i++) {
- SpyMessage message=(SpyMessage)msgs[i];
- Log.log(" Send Msg : "+message.toString());
- if (!message.isOutdated())
- queue.sendOneMessage(message);
- }
}
-
- //Notify that it has finished his work : now, another thread
can start working on this destination
- queue.endWork();
+
+ //Ask the queue to do its job
+ queue.doMyJob();
}
@@ -191,7 +174,7 @@
//A connection has send a new message
public void newMessage(SpyMessage val,String id) throws JMSException
{
- Log.log("JMSserver: newMessage(val="+val.toString()+")");
+ Log.log("JMSserver:
newMessage(dest="+val.jmsDestination+",val="+val.toString()+")");
JMSServerQueue
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
if (queue==null) throw new JMSException("This destination does not
exist !");
1.26 +58 -7 spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -r1.25 -r1.26
--- JMSServerQueue.java 2000/05/18 00:16:10 1.25
+++ JMSServerQueue.java 2000/05/18 20:20:56 1.26
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.25 $
+ * @version $Revision: 1.26 $
*/
public class JMSServerQueue
{
@@ -39,8 +39,11 @@
SpyDistributedConnection temporaryDestination;
//The JMSServer object
private JMSServer server;
+ //Am I a queue or a topic
+ boolean isTopic;
+ //List of messages waiting for acknowledgment
+ private LinkedList messagesWaitingForAck;
-
// Constructor ---------------------------------------------------
JMSServerQueue(Destination dest,SpyDistributedConnection temporary,JMSServer
server)
@@ -52,6 +55,14 @@
alreadyInTaskQueue=false;
temporaryDestination=temporary;
this.server=server;
+
+ if (dest instanceof SpyTopic) {
+ isTopic=true;
+ messagesWaitingForAck=null;
+ } else {
+ isTopic=false;
+ messagesWaitingForAck=new LinkedList();
+ }
}
// Package protected ---------------------------------------------
@@ -96,8 +107,13 @@
messages.add(i,mes);
}
- //if a thread is already working on this destination, I don't
have to myself to the taskqueue
- if (!threadWorking) notifyWorkers();
+ if (isTopic) {
+ //if a thread is already working on this destination,
I don't have to myself to the taskqueue
+ if (!threadWorking) notifyWorkers();
+ } else {
+ Log.log("Queue: addMessage");
+ }
+
}
}
@@ -122,8 +138,12 @@
threadWorking=false;
synchronized (messages) {
- //notify another thread if there is work to do !
- if (!messages.isEmpty()) notifyWorkers();
+ if (isTopic) {
+ //notify another thread if there is work to do !
+ if (!messages.isEmpty()) notifyWorkers();
+ } else {
+ Log.log("Queue: endWork");
+ }
}
}
@@ -168,7 +188,7 @@
}
}
- //A connection is closing !
+ //A connection is closing
void connectionClosing(SpyDistributedConnection dc)
{
if (!subscribers.contains(dc)) return;
@@ -198,6 +218,37 @@
//remove this connection from the list
i.remove();
+ }
+
+ void doMyJob()
+ {
+ if (isTopic) {
+
+ //Clear the message queue
+ SpyMessage[] msgs=startWork();
+
+ //Let the thread do its work
+ if (msgs.length>1) {
+ //We can send multiple messages
+ Log.log("Send Msgs[1.."+msgs.length+"]");
+ sendMultipleMessages(msgs);
+ } else {
+ //Send each message
+ for(int i=0;i<msgs.length;i++) {
+ SpyMessage message=(SpyMessage)msgs[i];
+ Log.log("Send one
msg("+message.toString()+")");
+ if (!message.isOutdated())
sendOneMessage(message);
+ }
+ }
+
+ //Notify that it has finished its work : another thread can
start working on this queue
+ endWork();
+
+ } else {
+
+ Log.log("Queue :)");
+
+ }
}
}
1.14 +15 -15 spyderMQ/src/java/org/spyderMQ/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- SessionQueue.java 2000/05/17 23:43:08 1.13
+++ SessionQueue.java 2000/05/18 20:20:57 1.14
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.13 $
+ * @version $Revision: 1.14 $
*/
public class SessionQueue
{
@@ -29,7 +29,7 @@
//List of Pending messages (not yet delivered)
LinkedList messages;
//List of messages waiting for acknoledgment
- LinkedList messagesWaitForAck;
+ LinkedList messagesWaitingForAck;
//Is the consumer sleeping in a receive() ?
boolean waitInReceive;
//Is the session transacted ?
@@ -42,7 +42,7 @@
SessionQueue(boolean tr,int am)
{
messages=new LinkedList();
- messagesWaitForAck=new LinkedList();
+ messagesWaitingForAck=new LinkedList();
waitInReceive=false;
transacted=tr;
acknowledgeMode=am;
@@ -140,9 +140,9 @@
if (!transacted) {
if
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
- synchronized
(messagesWaitForAck) {
+ synchronized
(messagesWaitingForAck) {
//Put the message in
the messagesWaitForAck queue
-
messagesWaitForAck.addLast(message);
+
messagesWaitingForAck.addLast(message);
}
message.setSessionQueue(this);
@@ -157,9 +157,9 @@
//We are linked to a transacted
session
- synchronized (messagesWaitForAck) {
+ synchronized (messagesWaitingForAck) {
//Put the message in the
messagesWaitForAck queue
-
messagesWaitForAck.addLast(message);
+
messagesWaitingForAck.addLast(message);
}
}
@@ -181,14 +181,14 @@
{
Log.log("SessionQueue: acknowledge("+mes.toString()+")");
- synchronized (messagesWaitForAck) {
+ synchronized (messagesWaitingForAck) {
- int pos=messagesWaitForAck.indexOf(mes);
+ int pos=messagesWaitingForAck.indexOf(mes);
if (pos==-1) return;
for(int i=0;i<=pos;i++)
- messagesWaitForAck.removeFirst();
+ messagesWaitingForAck.removeFirst();
}
@@ -198,12 +198,12 @@
void recover() throws JMSException
{
synchronized (messages) {
- synchronized (messagesWaitForAck) {
+ synchronized (messagesWaitingForAck) {
- while (messagesWaitForAck.size()!=0) {
+ while (messagesWaitingForAck.size()!=0) {
//Get the most recent unacknowledged message
- SpyMessage
mes=(SpyMessage)messagesWaitForAck.removeLast();
+ SpyMessage
mes=(SpyMessage)messagesWaitingForAck.removeLast();
//This message is redelivered
mes.setJMSRedelivered(true);
@@ -220,8 +220,8 @@
//the session is about to commit, we have to clear our messagesWaitForAck queue
void commit()
{
- synchronized (messagesWaitForAck) {
- messagesWaitForAck.clear();
+ synchronized (messagesWaitingForAck) {
+ messagesWaitingForAck.clear();
}
}
1.10 +12 -97 spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyMessageConsumer.java 2000/05/17 23:43:08 1.9
+++ SpyMessageConsumer.java 2000/05/18 20:20:57 1.10
@@ -15,11 +15,11 @@
import org.spydermq.selectors.Selector;
/**
- * This class implements javax.jms.MessageConsumer
+ * This class implements javax.jms.MessageConsumer - Going to be deprecated
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyMessageConsumer
implements MessageConsumer
@@ -30,8 +30,6 @@
protected SpySession session;
//My message listener (null if none)
public MessageListener messageListener;
- //A link to my session queue (in my session)
- protected SessionQueue mySessionQueue;
//Am I closed ?
protected boolean closed;
//Do I have a selector
@@ -39,10 +37,9 @@
// Constructor ---------------------------------------------------
- SpyMessageConsumer(SpySession s,SessionQueue sq)
+ SpyMessageConsumer(SpySession s)
{
session=s;
- mySessionQueue=sq;
messageListener=null;
closed=false;
selector=null;
@@ -66,113 +63,31 @@
}
public void setMessageListener(MessageListener listener) throws JMSException
- {
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
- messageListener=listener;
-
- //Signal the change to the session thread ( it could sleep, while
there are messages for him )
- synchronized (session.thread) {
- session.thread.notify();
- }
+ {
+ //Job is done in the inherited classes
}
public Message receive() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
- synchronized (mySessionQueue.messages) {
-
- //if the client follows the specification [4.4.6], he cannot
use this session
- //to asynchronously receive a message or receive() from
another thread.
- //If a message is already pending for this session, we can
immediatly deliver it
-
- while (true) {
-
- if (closed) return null;
-
- if (!session.modeStop) {
- Message
mes=mySessionQueue.getMessage(selector);
- if (mes!=null) return mes;
- } else Log.log("the connection is stopped !");
-
- try {
- mySessionQueue.waitInReceive=true;
- mySessionQueue.messages.wait();
- } catch (InterruptedException e) {
- } finally {
- mySessionQueue.waitInReceive=false;
- }
-
- }
- }
+ //Job is done in the inherited classes
+ return null;
}
public Message receive(long timeOut) throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
- if (timeOut==0) return receive();
-
- long endTime=(new Date()).getTime()+timeOut;
-
- synchronized (mySessionQueue.messages) {
-
- //if the client respects the specification [4.4.6], he cannot
use this session
- //to asynchronously receive a message or receive() from
another thread.
- //If a message is already pending for this session, we can
deliver it
-
- while (true) {
-
- if (closed) return null;
-
- if (!session.modeStop) {
- Message
mes=mySessionQueue.getMessage(selector);
- if (mes!=null) return mes;
- } else Log.log("the connection is stopped !");
-
- long att=endTime-((new Date()).getTime());
- if (att<=0) return null;
-
- try {
- mySessionQueue.waitInReceive=true;
- mySessionQueue.messages.wait(att);
- } catch (InterruptedException e) {
- } finally {
- mySessionQueue.waitInReceive=false;
- }
-
- }
- }
-
+ //Job is done in the inherited classes
+ return null;
}
public Message receiveNoWait() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
- synchronized (mySessionQueue.messages) {
-
- while (true) {
- if (session.modeStop) return null;
- return mySessionQueue.getMessage(selector);
- }
-
- }
+ //Job is done in the inherited classes
+ return null;
}
public synchronized void close() throws JMSException
{
- if (closed) return;
- closed=true;
-
- if (mySessionQueue.waitInReceive&&messageListener==null) {
-
- //A consumer could be waiting in receive()
- synchronized (mySessionQueue.messages) {
- mySessionQueue.messages.notify();
- }
- }
+ //Job is done in the inherited classes
}
}
1.5 +11 -6 spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyQueueSession.java 2000/05/15 02:56:26 1.4
+++ SpyQueueSession.java 2000/05/18 20:20:57 1.5
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyQueueSession
extends SpySession
@@ -34,7 +34,7 @@
SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
- super(myConnection,transacted,acknowledgeMode,stop);
+ super(myConnection,transacted,acknowledgeMode,stop,false);
}
// Public --------------------------------------------------------
@@ -66,8 +66,8 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- //Not implemented yet
- return null;
+ SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,modeStop);
+ return receiver;
}
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws
JMSException
@@ -138,8 +138,8 @@
}
- //notify the thread that there is work to do
- //we should change this...
+ //Notify the [sleeping ?] thread that there is work to do
+ //We should not wait for the lock...
synchronized (thread)
{
thread.notify();
@@ -156,6 +156,11 @@
void removeConsumer(Destination dest, SessionQueue who) throws JMSException
{
//Not implemented yet
+ }
+
+ //One receiver is changing its mode
+ void notifyStopChange(SpyQueueReceiver receiver,boolean newMode)
+ {
}
}
1.13 +38 -120 spyderMQ/src/java/org/spyderMQ/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SpySession.java 2000/05/15 19:40:17 1.12
+++ SpySession.java 2000/05/18 20:20:57 1.13
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class SpySession
implements Runnable, Session
@@ -56,10 +56,12 @@
boolean closed;
//This object is the object used to synchronize the session's thread
public Integer thread;
+ //Am I linked to a topic
+ public boolean isTopic;
// Constructor ---------------------------------------------------
- SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
+ SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean
stop,boolean isTopic)
{
connection=conn;
transacted=trans;
@@ -70,6 +72,7 @@
modeStop=stop;
closed=false;
thread=new Integer(0);
+ this.isTopic=isTopic;
//Start one thread for each session
Thread oneThread=new Thread(this);
@@ -146,111 +149,7 @@
return transacted;
}
- //Commit a transacted session
- public synchronized void commit() throws JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
- if (!transacted) throw new IllegalStateException("The session is not
transacted");
-
- Log.log("Session: commit()");
-
- boolean modeSav=modeStop;
- modeStop=true;
-
- //Wait for the thread to sleep
- synchronized (thread) {
-
- //Move the outgoing messages from the outgoingQueue to the
outgoingCommitedQueue
- outgoingCommitedQueue.addAll(outgoingQueue);
- outgoingQueue.clear();
-
- //Notify each SessionQueue that we are going to commit
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
- sessionQueue.commit();
- }
- }
-
- //We have finished our work, we can wake up the thread
- modeStop=modeSav;
- thread.notify();
- }
-
- }
-
- //Rollback a transacted session
- public synchronized void rollback() throws JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
- if (!transacted) throw new IllegalStateException("The session is not
transacted");
-
- Log.log("Session: rollback()");
-
- boolean modeSav=modeStop;
- modeStop=true;
-
- //Wait for the thread to sleep
- synchronized (thread) {
-
- //Clear the outgoing queue
- outgoingQueue.clear();
-
- //Notify each SessionQueue that we are going to rollback
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
- sessionQueue.recover();
- }
- }
-
- //We have finished our work, we can wake up the thread
- modeStop=modeSav;
- thread.notify();
- }
- }
-
- public synchronized void recover() throws JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
- if (transacted) throw new IllegalStateException("The session is
transacted");
-
- Log.log("Session: recover()");
- boolean modeSav=modeStop;
- modeStop=true;
-
- //Wait for the thread to sleep
- synchronized (thread) {
-
- //Notify each SessionQueue that we are going to recover
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
- sessionQueue.recover();
- }
- }
-
- //We have finished our work, we can wake up the thread
- modeStop=modeSav;
- thread.notify();
- }
-
-
- }
-
public MessageListener getMessageListener() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -309,21 +208,25 @@
}
}
- //if we are not in stopped mode, look at incoming
queues
+ //there is no incoming queue in the SpyQueueSession
- if (!modeStop) {
+ if (isTopic) {
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
-
doneJob=doneJob||sessionQueue.deliverMessage();
- }
- }
-
+ //if we are not in stopped mode, look at the
incoming queue
+ if (!modeStop) {
+
+ Collection values =
subscribers.values();
+ Iterator i1=values.iterator();
+ while (i1.hasNext()) {
+ HashSet set=(HashSet)i1.next();
+ Iterator i2=set.iterator();
+ while (i2.hasNext()) {
+ SessionQueue
sessionQueue=(SessionQueue)i2.next();
+
doneJob=doneJob||sessionQueue.deliverMessage();
+ }
+ }
+
+ }
}
//If there were smthg to do, try again
@@ -371,9 +274,24 @@
public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
{
- //The job is done in the inherited classes
+ //The job is done in inherited classes
}
+ public void commit() throws JMSException
+ {
+ //The job is done in inherited classes
+ }
+
+ public void rollback() throws JMSException
+ {
+ //The job is done in inherited classes
+ }
+
+ public void recover() throws JMSException
+ {
+ //The job is done in inherited classes
+ }
+
public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
{
Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
1.8 +2 -1 spyderMQ/src/java/org/spyderMQ/SpyTopicPublisher.java
Index: SpyTopicPublisher.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicPublisher.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyTopicPublisher.java 2000/05/15 02:08:59 1.7
+++ SpyTopicPublisher.java 2000/05/18 20:20:57 1.8
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyTopicPublisher
extends SpyMessageProducer
@@ -94,4 +94,5 @@
//We must put a 'new message' in the Session's outgoing queue [3.9]
mySession.sendMessage(message.myClone());
}
+
}
1.21 +110 -2 spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- SpyTopicSession.java 2000/05/17 23:43:08 1.20
+++ SpyTopicSession.java 2000/05/18 20:20:57 1.21
@@ -13,6 +13,7 @@
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.TemporaryTopic;
+import java.util.Collection;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -22,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.20 $
+ * @version $Revision: 1.21 $
*/
public class SpyTopicSession
extends SpySession
@@ -33,7 +34,7 @@
SpyTopicSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
- super(myConnection,transacted,acknowledgeMode,stop);
+ super(myConnection,transacted,acknowledgeMode,stop,true);
}
// Public --------------------------------------------------------
@@ -96,6 +97,113 @@
public void unsubscribe(String name) throws JMSException
{
//Not implemented yet
+ }
+
+ //overides SpySession
+
+ //Commit a transacted session
+ public synchronized void commit() throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The session is closed");
+ if (!transacted) throw new IllegalStateException("The session is not
transacted");
+
+ Log.log("Session: commit()");
+
+ boolean modeSav=modeStop;
+ modeStop=true;
+
+ //Wait for the thread to sleep
+ synchronized (thread) {
+
+ //Move the outgoing messages from the outgoingQueue to the
outgoingCommitedQueue
+ outgoingCommitedQueue.addAll(outgoingQueue);
+ outgoingQueue.clear();
+
+ //Notify each SessionQueue that we are going to commit
+ Collection values = subscribers.values();
+ Iterator i1=values.iterator();
+ while (i1.hasNext()) {
+ HashSet set=(HashSet)i1.next();
+ Iterator i2=set.iterator();
+ while (i2.hasNext()) {
+ SessionQueue
sessionQueue=(SessionQueue)i2.next();
+ sessionQueue.commit();
+ }
+ }
+
+ //We have finished our work, we can wake up the thread
+ modeStop=modeSav;
+ thread.notify();
+ }
+
+ }
+
+ //Rollback a transacted session
+ public synchronized void rollback() throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The session is closed");
+ if (!transacted) throw new IllegalStateException("The session is not
transacted");
+
+ Log.log("Session: rollback()");
+
+ boolean modeSav=modeStop;
+ modeStop=true;
+
+ //Wait for the thread to sleep
+ synchronized (thread) {
+
+ //Clear the outgoing queue
+ outgoingQueue.clear();
+
+ //Notify each SessionQueue that we are going to rollback
+ Collection values = subscribers.values();
+ Iterator i1=values.iterator();
+ while (i1.hasNext()) {
+ HashSet set=(HashSet)i1.next();
+ Iterator i2=set.iterator();
+ while (i2.hasNext()) {
+ SessionQueue
sessionQueue=(SessionQueue)i2.next();
+ sessionQueue.recover();
+ }
+ }
+
+ //We have finished our work, we can wake up the thread
+ modeStop=modeSav;
+ thread.notify();
+ }
+ }
+
+ public synchronized void recover() throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The session is closed");
+ if (transacted) throw new IllegalStateException("The session is
transacted");
+
+ Log.log("Session: recover()");
+
+ boolean modeSav=modeStop;
+ modeStop=true;
+
+ //Wait for the thread to sleep
+ synchronized (thread) {
+
+ //Notify each SessionQueue that we are going to recover
+ Collection values = subscribers.values();
+ Iterator i1=values.iterator();
+ while (i1.hasNext()) {
+ HashSet set=(HashSet)i1.next();
+ Iterator i2=set.iterator();
+ while (i2.hasNext()) {
+ SessionQueue
sessionQueue=(SessionQueue)i2.next();
+ sessionQueue.recover();
+ }
+ }
+
+ //We have finished our work, we can wake up the thread
+ modeStop=modeSav;
+ thread.notify();
+ }
+
+
}
//Not part of the spec
1.8 +117 -3 spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyTopicSubscriber.java 2000/05/17 23:43:08 1.7
+++ SpyTopicSubscriber.java 2000/05/18 20:20:57 1.8
@@ -9,6 +9,9 @@
import javax.jms.TopicSubscriber;
import javax.jms.JMSException;
import javax.jms.Topic;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.Date;
import org.spydermq.selectors.Selector;
@@ -17,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -27,13 +30,16 @@
//The topic I registered
private Topic topic;
+ //A link to my session queue (in my session)
+ private SessionQueue mySessionQueue;
// Constructor ---------------------------------------------------
SpyTopicSubscriber(SpyTopicSession s,SessionQueue sq,Topic t)
{
- super(s,sq);
+ super(s);
topic=t;
+ mySessionQueue=sq;
}
// Public --------------------------------------------------------
@@ -55,8 +61,116 @@
public void close() throws JMSException
{
+ if (closed) return;
+ closed=true;
+
session.removeConsumer(topic,mySessionQueue);
- super.close();
+
+ if (mySessionQueue.waitInReceive&&messageListener==null) {
+
+ //A consumer could be waiting in receive()
+ synchronized (mySessionQueue.messages) {
+ mySessionQueue.messages.notify();
+ }
+ }
+ }
+
+ //Overrides MessageConsumer
+
+ public Message receive() throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+
+ synchronized (mySessionQueue.messages) {
+
+ //if the client follows the specification [4.4.6], he cannot
use this session
+ //to asynchronously receive a message or receive() from
another thread.
+ //If a message is already pending for this session, we can
immediatly deliver it
+
+ while (true) {
+
+ if (closed) return null;
+
+ if (!session.modeStop) {
+ Message
mes=mySessionQueue.getMessage(selector);
+ if (mes!=null) return mes;
+ } else Log.log("the connection is stopped !");
+
+ try {
+ mySessionQueue.waitInReceive=true;
+ mySessionQueue.messages.wait();
+ } catch (InterruptedException e) {
+ } finally {
+ mySessionQueue.waitInReceive=false;
+ }
+
+ }
+ }
+ }
+
+ public Message receive(long timeOut) throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+
+ if (timeOut==0) return receive();
+
+ long endTime=(new Date()).getTime()+timeOut;
+
+ synchronized (mySessionQueue.messages) {
+
+ //if the client respects the specification [4.4.6], he cannot
use this session
+ //to asynchronously receive a message or receive() from
another thread.
+ //If a message is already pending for this session, we can
deliver it
+
+ while (true) {
+
+ if (closed) return null;
+
+ if (!session.modeStop) {
+ Message
mes=mySessionQueue.getMessage(selector);
+ if (mes!=null) return mes;
+ } else Log.log("the connection is stopped !");
+
+ long att=endTime-((new Date()).getTime());
+ if (att<=0) return null;
+
+ try {
+ mySessionQueue.waitInReceive=true;
+ mySessionQueue.messages.wait(att);
+ } catch (InterruptedException e) {
+ } finally {
+ mySessionQueue.waitInReceive=false;
+ }
+
+ }
+ }
+
+ }
+
+ public Message receiveNoWait() throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+
+ synchronized (mySessionQueue.messages) {
+
+ while (true) {
+ if (session.modeStop) return null;
+ return mySessionQueue.getMessage(selector);
+ }
+
+ }
+ }
+
+ public void setMessageListener(MessageListener listener) throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+
+ messageListener=listener;
+
+ //Signal the change to the session thread ( it could sleep, while
there are messages for him )
+ synchronized (session.thread) {
+ session.thread.notify();
+ }
}
// ----- Debug only ----- [not part of the spec]