User: norbert
Date: 00/06/14 12:16:52
Modified: src/java/org/spydermq JMSServer.java SpyQueueSession.java
SpySession.java SpyTopicSession.java
SpyTopicSubscriber.java
Added: src/java/org/spydermq JMSServerMBean.java Mutex.java
Log:
Add some JMX instrumentation
change the jnp server
add a mutex object for a better synchronization
Revision Changes Path
1.4 +3 -2 spyderMQ/src/java/org/spydermq/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServer.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- JMSServer.java 2000/06/01 02:47:48 1.3
+++ JMSServer.java 2000/06/14 19:16:50 1.4
@@ -22,16 +22,17 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class JMSServer
- implements Runnable
+ implements Runnable, JMSServerMBean
{
// Constants -----------------------------------------------------
//number of threads in the pool (TO DO: this value should be dynamic)
final int NB_THREADS=1;
+ public static final String OBJECT_NAME = "JMS:service=JMSServer";
// Attributes ----------------------------------------------------
1.3 +3 -7 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueueSession.java 2000/06/05 03:19:23 1.2
+++ SpyQueueSession.java 2000/06/14 19:16:51 1.3
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueueSession
extends SpySession
@@ -137,12 +137,8 @@
}
- //Notify the [sleeping ?] thread that there is work to do
- //We should not have to wait for the lock...
- synchronized (thread)
- {
- thread.notify();
- }
+ //Notify the [sleeping ?] thread that there is work to do
+ mutex.notifyLock();
}
}
1.3 +71 -69 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpySession.java 2000/06/01 01:14:29 1.2
+++ SpySession.java 2000/06/14 19:16:51 1.3
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpySession
implements Runnable, Session
@@ -55,7 +55,7 @@
//Is the session closed ?
boolean closed;
//This object is the object used to synchronize the session's thread - Need
fixed / improvement
- public Integer thread;
+ public Mutex mutex;
//Is this session in alpha mode ?
public boolean alphaMode;
@@ -72,7 +72,7 @@
modeStop=stop;
messageListener=null;
closed=false;
- thread=new Integer(0);
+ mutex=new Mutex();
alphaMode=true;
//Start my thread
@@ -169,74 +169,74 @@
public void run()
{
Log.log("Hi ! I'm a session thread :)");
+
+ mutex.acquireLock();
while (true) {
- synchronized (thread) {
- boolean doneJob=false;
+ boolean doneJob=false;
- if (closed) return;
+ if (closed) break;
- //look at outgoing queues
+ //look at outgoing queues
- SpyMessage outgoingJob[]=null;
+ SpyMessage outgoingJob[]=null;
- if (transacted) {
- synchronized (outgoingCommitedQueue) {
- //The session is transacted, we take
the outgoing msgs from outgoingCommitedQueue
- if (outgoingCommitedQueue.size()!=0) {
- SpyMessage array[]=new
SpyMessage[outgoingCommitedQueue.size()];
-
outgoingJob=(SpyMessage[])outgoingCommitedQueue.toArray(array);
- outgoingCommitedQueue.clear();
- }
- }
- } else {
- synchronized (outgoingQueue) {
- //The session is not transacted, we
take the outgoing msgs from outgoingQueue
- if (outgoingQueue.size()!=0) {
- SpyMessage array[]=new
SpyMessage[outgoingQueue.size()];
-
outgoingJob=(SpyMessage[])outgoingQueue.toArray(array);
- outgoingQueue.clear();
- }
+ if (transacted) {
+ synchronized (outgoingCommitedQueue) {
+ //The session is transacted, we take the
outgoing msgs from outgoingCommitedQueue
+ if (outgoingCommitedQueue.size()!=0) {
+ SpyMessage array[]=new
SpyMessage[outgoingCommitedQueue.size()];
+
outgoingJob=(SpyMessage[])outgoingCommitedQueue.toArray(array);
+ outgoingCommitedQueue.clear();
+ }
+ }
+ } else {
+ synchronized (outgoingQueue) {
+ //The session is not transacted, we take the
outgoing msgs from outgoingQueue
+ if (outgoingQueue.size()!=0) {
+ SpyMessage array[]=new
SpyMessage[outgoingQueue.size()];
+
outgoingJob=(SpyMessage[])outgoingQueue.toArray(array);
+ outgoingQueue.clear();
}
}
+ }
- if (outgoingJob!=null) {
- try {
- //Check for outdated messages !
- connection.sendToServer(outgoingJob);
- doneJob=true;
- } catch (JMSException e) {
- Log.log("Cannot send
"+outgoingJob.toString()+" to the provider...");
- Log.error(e);
- }
+ if (outgoingJob!=null) {
+ try {
+ //Check for outdated messages !
+ connection.sendToServer(outgoingJob);
+ doneJob=true;
+ } catch (JMSException e) {
+ Log.log("Cannot send
"+outgoingJob.toString()+" to the provider...");
+ Log.error(e);
}
+ }
- //if we are not in stopped mode, look at the incoming
queue
+ //if we are not in stopped mode, look at the incoming queue
- if (!modeStop) {
+ if (!modeStop) {
- Collection values = destinations.values();
- Iterator i=values.iterator();
- while (i.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i.next();
-
doneJob=doneJob||sessionQueue.deliverMessage();
- }
-
+ Collection values = destinations.values();
+ Iterator i=values.iterator();
+ while (i.hasNext()) {
+ SessionQueue
sessionQueue=(SessionQueue)i.next();
+ doneJob=doneJob||sessionQueue.deliverMessage();
}
- //If there was smthg to do, try again
- if (doneJob) continue;
+ }
- try {
- Log.log("SessionThread: I'm going to bed...");
- thread.wait();
- Log.log("SessionThread: I wake up");
- } catch (InterruptedException e) {
- }
+ //If there was smthg to do, try again
+ if (doneJob) continue;
+
+ Log.log("SessionThread: I'm going to bed...");
+ mutex.waitLock();
+ Log.log("SessionThread: I wake up");
- }
}
+
+ mutex.releaseLock();
+
}
public synchronized void close() throws JMSException
@@ -244,10 +244,9 @@
if (closed) return;
closed=true;
- //if the thread is sleeping, kill it
- synchronized (thread) {
- thread.notify();
- }
+ //if the thread is sleeping, kill it
+ mutex.notifyLock();
+ mutex.waitToSleep();
//notify the sleeping synchronous listeners
@@ -281,8 +280,10 @@
modeStop=true;
//Wait for the thread to sleep
- synchronized (thread) {
+ synchronized (mutex) {
+ mutex.waitToSleep();
+
//Move the outgoing messages from the outgoingQueue to the
outgoingCommitedQueue
outgoingCommitedQueue.addAll(outgoingQueue);
outgoingQueue.clear();
@@ -297,7 +298,8 @@
//We have finished our work, we can wake up the thread
modeStop=modeSav;
- thread.notify();
+ mutex.notify();
+
}
}
@@ -314,7 +316,9 @@
modeStop=true;
//Wait for the thread to sleep
- synchronized (thread) {
+ synchronized (mutex) {
+
+ mutex.waitToSleep();
//Clear the outgoing queue
outgoingQueue.clear();
@@ -329,7 +333,7 @@
//We have finished our work, we can wake up the thread
modeStop=modeSav;
- thread.notify();
+ mutex.notify();
}
}
@@ -344,8 +348,10 @@
modeStop=true;
//Wait for the thread to sleep
- synchronized (thread) {
+ synchronized (mutex) {
+ mutex.waitToSleep();
+
//Notify each SessionQueue that we are going to recover
Collection values = destinations.values();
Iterator i=values.iterator();
@@ -356,7 +362,7 @@
//We have finished our work, we can wake up the thread
modeStop=modeSav;
- thread.notify();
+ mutex.notify();
}
@@ -434,7 +440,7 @@
//The connection has changed its mode (stop() or start())
//We have to wait until message delivery has stopped or wake up the thread
void notifyStopMode(boolean newValue)
- {
+ {
if (closed) throw new IllegalStateException("The session is closed");
if (modeStop==newValue) return;
@@ -444,16 +450,12 @@
if (modeStop) {
//Wait for the thread to sleep
- synchronized (thread) {
- ;
- }
+ mutex.waitToSleep();
} else {
//Wake up the thread
- synchronized (thread) {
- thread.notify();
- }
+ mutex.notifyLock();
}
1.2 +2 -6 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyTopicSession.java 2000/05/31 18:06:47 1.1
+++ SpyTopicSession.java 2000/06/14 19:16:51 1.2
@@ -24,7 +24,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyTopicSession
extends SpySession
@@ -163,11 +163,7 @@
}
//notify the thread that there is work to do
- //we should change this...
- synchronized (thread)
- {
- thread.notify();
- }
+ mutex.notifyLock();
}
1.4 +3 -4 spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyTopicSubscriber.java 2000/06/09 20:03:58 1.3
+++ SpyTopicSubscriber.java 2000/06/14 19:16:51 1.4
@@ -19,7 +19,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -165,9 +165,8 @@
messageListener=listener;
//Signal the change to the session thread ( it could sleep, while
there are messages for him )
- synchronized (session.thread) {
- session.thread.notify();
- }
+ session.mutex.notify();
+
}
}
1.1 spyderMQ/src/java/org/spydermq/JMSServerMBean.java
Index: JMSServerMBean.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.JMSException;
/**
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface JMSServerMBean
{
public SpyTopic newTopic(String name) throws JMSException;
public SpyQueue newQueue(String name) throws JMSException;
}
1.1 spyderMQ/src/java/org/spydermq/Mutex.java
Index: Mutex.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
/**
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class Mutex
{
private boolean sleeping;
private boolean locked;
private boolean work;
private Object obj;
Mutex()
{
sleeping=false;
locked=false;
work=false;
obj=new Object();
}
public synchronized void acquireLock()
{
if (locked) throw new RuntimeException("Already locked !");
locked=true;
}
public synchronized void releaseLock()
{
if (!locked) throw new RuntimeException("Not locked !");
locked=false;
this.notify();
}
public synchronized void notifyLock()
{
if (!locked) throw new RuntimeException("Not locked !");
work=true;
if (!sleeping) return;
synchronized (obj) {
obj.notify();
}
}
public synchronized void waitToSleep()
{
if (!locked) return;
if (sleeping) return;
try {
this.wait();
} catch (InterruptedException e) {
}
}
public void waitLock()
{
synchronized (this)
{
if (!locked) throw new RuntimeException("Not locked !");
if (sleeping) throw new RuntimeException("Already sleeping !");
if (work) {
work=false;
return;
}
sleeping=true;
this.notify();
}
synchronized (obj) {
try {
obj.wait();
} catch (InterruptedException e) {
}
}
synchronized (this)
{
sleeping=false;
work=false;
}
}
}