User: norbert
Date: 00/05/30 15:10:19
Modified: src/java/org/spyderMQ ConnectionQueue.java JMSServer.java
JMSServerQueue.java SessionQueue.java
SpyConnection.java SpyDistributedConnection.java
SpyQueueSession.java SpySession.java
Log:
The P2P system
Revision Changes Path
1.4 +15 -9 spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java
Index: ConnectionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ConnectionQueue.java 2000/05/25 01:52:05 1.3
+++ ConnectionQueue.java 2000/05/30 22:10:17 1.4
@@ -16,7 +16,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class ConnectionQueue
{
@@ -27,7 +27,7 @@
//the SpySessions linked to this queue
public HashSet subscribers;
//Number of listening sessions
- int NumListeningSessions;
+ public int NumListeningSessions;
//My SpyConnection
SpyConnection connection;
@@ -58,17 +58,23 @@
return subscribers.size()==0;
}
- synchronized void changeNumListening(int val)
+ synchronized void changeNumListening(int val) throws JMSException
{
NumListeningSessions+=val;
- Log.log("ConnectionQueue:
changeNumListening("+NumListeningSessions+")");
+ Log.log("ConnectionQueue:
changeNumListening(sessions="+NumListeningSessions+")");
- /*if (val==-1&&NumListeningSubscribers==0) {
-
((SpyQueueConnection)session.connection).changeNumListening(val);
- } else if (val==1&&NumListeningSubscribers==1) {
-
((SpyQueueConnection)session.connection).changeNumListening(val);
- }*/
+ try {
+
+ if (val==-1&&NumListeningSessions==0) {
+
connection.provider.connectionListening(false,destination,connection.distributedConnection);
+ } else if (val==1&&NumListeningSessions==1) {
+
connection.provider.connectionListening(true,destination,connection.distributedConnection);
+ }
+
+ } catch (Exception e) {
+ connection.failureHandler(e,"Cannot contact the JMS server");
+ }
}
}
1.33 +11 -2 spyderMQ/src/java/org/spyderMQ/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -r1.32 -r1.33
--- JMSServer.java 2000/05/25 01:18:33 1.32
+++ JMSServer.java 2000/05/30 22:10:17 1.33
@@ -22,7 +22,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.32 $
+ * @version $Revision: 1.33 $
*/
public class JMSServer
implements Runnable
@@ -38,7 +38,7 @@
//messages pending for a Destination ( HashMap of JMSServerQueue objects )
private HashMap messageQueue;
//list of tasks pending ( linked list of JMSServerQueue objects )
- LinkedList taskQueue;
+ LinkedList taskQueue; //look when we unregister a temporaryTopic/Queue
//last id given to a client
private int lastID;
//last id given to a temporary topic
@@ -312,4 +312,13 @@
return serverQueue.queueReceiveNoWait();
}
+
+ public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws JMSException
+ {
+ JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
+ if (serverQueue==null) throw new JMSException("This destination does
not exist !");
+
+ serverQueue.connectionListening(mode,dc);
+ }
+
}
1.28 +92 -14 spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -r1.27 -r1.28
--- JMSServerQueue.java 2000/05/19 19:28:49 1.27
+++ JMSServerQueue.java 2000/05/30 22:10:17 1.28
@@ -11,14 +11,14 @@
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
-import java.util.HashSet;
+import java.util.HashMap;
/**
* This class is a message queue which is stored (hashed by Destination) on the
JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.27 $
+ * @version $Revision: 1.28 $
*/
public class JMSServerQueue
{
@@ -27,7 +27,7 @@
//the Destination of this queue
Destination destination;
//DistributedConnection objs that have "registered" to this Destination
- HashSet subscribers;
+ private HashMap subscribers;
//List of Pending messages
private LinkedList messages;
//Is a thread already working on this queue ?
@@ -43,13 +43,15 @@
boolean isTopic;
//List of messages waiting for acknowledgment
private LinkedList messagesWaitingForAck;
+ //Nb of listeners for this Queue
+ int listeners;
// Constructor ---------------------------------------------------
JMSServerQueue(Destination dest,SpyDistributedConnection temporary,JMSServer
server)
{
destination=dest;
- subscribers=new HashSet();
+ subscribers=new HashMap();
messages=new LinkedList();
threadWorking=false;
alreadyInTaskQueue=false;
@@ -57,6 +59,7 @@
this.server=server;
messagesWaitingForAck=new LinkedList();
isTopic=dest instanceof SpyTopic;
+ listeners=0;
}
// Package protected ---------------------------------------------
@@ -66,7 +69,7 @@
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
if
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new
JMSException("You cannot subscriber to this temporary destination");
- subscribers.add(dc);
+ subscribers.put(dc.getClientID(),dc);
}
}
@@ -74,7 +77,13 @@
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
+
+ SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
+ if (distributedConnection==null) return;
+ listeners-=distributedConnection.listeners;
+
subscribers.remove(dc);
+
}
}
@@ -103,9 +112,9 @@
if (isTopic) {
//if a thread is already working on this destination,
I don't have to myself to the taskqueue
- if (!threadWorking) notifyWorkers();
+ if (!threadWorking) notifyWorkers();
} else {
- Log.log("Queue: addMessage");
+ if (listeners!=0&&!threadWorking) notifyWorkers();
}
}
@@ -126,6 +135,18 @@
}
}
+ synchronized SpyMessage startWorkQueue()
+ {
+ synchronized (messages) {
+
+ threadWorking=true;
+ alreadyInTaskQueue=false;
+
+ if (messages.size()==0) return null;
+ return (SpyMessage)messages.removeFirst();
+ }
+ }
+
void endWork()
{
//The thread has finished his work...
@@ -136,7 +157,7 @@
//notify another thread if there is work to do !
if (!messages.isEmpty()) notifyWorkers();
} else {
- Log.log("Queue: endWork");
+ if (listeners!=0&&!messages.isEmpty()) notifyWorkers();
}
}
}
@@ -147,7 +168,7 @@
synchronized (subscribers) {
if (subscribers.isEmpty()) return;
- Iterator i=subscribers.iterator();
+ Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
SpyDistributedConnection
dc=(SpyDistributedConnection)i.next();
@@ -167,7 +188,7 @@
synchronized (subscribers) {
if (subscribers.isEmpty()) return;
- Iterator i=subscribers.iterator();
+ Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
SpyDistributedConnection
dc=(SpyDistributedConnection)i.next();
@@ -185,7 +206,7 @@
//A connection is closing
void connectionClosing(SpyDistributedConnection dc)
{
- if (!subscribers.contains(dc)) return;
+ if (!subscribers.containsKey(dc.getClientID())) return;
Log.log("Warning: The DistributedConnection was still registered for
"+destination);
removeSubscriber(dc);
}
@@ -211,7 +232,8 @@
server.connectionClosing(dc,this);
//remove this connection from the list
- i.remove();
+ if (i!=null) i.remove();
+ else subscribers.remove(dc.getClientID());
}
void doMyJob()
@@ -219,7 +241,7 @@
if (isTopic) {
//Clear the message queue
- SpyMessage[] msgs=startWork();
+ SpyMessage[] msgs=startWork();
//Let the thread do its work
if (msgs.length>1) {
@@ -240,8 +262,41 @@
} else {
- Log.log("Queue :)");
+ while (true) {
+
+ //Get a receiver - NL We could find a better receiver
(load balancing ?)
+ if (listeners==0) break;
+ Iterator i=subscribers.values().iterator();
+ SpyDistributedConnection dc=null;
+ while (i.hasNext()) {
+ dc=(SpyDistributedConnection)i.next();
+ if (dc.listeners!=0) break;
+ }
+ if (dc==null||dc.listeners==0) {
+ listeners=0;
+ Log.log("WARNING: The listeners count was
invalid !");
+ break;
+ }
+
+ //Get the message
+ SpyMessage mes=startWorkQueue();
+ if (mes==null) break;
+ if (mes.isOutdated()) continue;
+
+ //Send the message
+ try {
+ dc.cr.receive(destination,mes);
+ } catch (Exception e) {
+ Log.error("Cannot deliver this message to the
client "+dc);
+ Log.error(e);
+ handleConnectionFailure(dc,null);
+ }
+
+ }
+ //Notify that it has finished its work : another thread can
start working on this queue
+ endWork();
+
}
}
@@ -253,4 +308,27 @@
}
}
+ void connectionListening(boolean mode,SpyDistributedConnection dc) throws
JMSException
+ {
+ SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
+ if (distributedConnection==null) throw new JMSException("This
DistributedConnection is not registered");
+
+ if (mode) {
+ distributedConnection.listeners++;
+ listeners++;
+
+ if (listeners==1&&!threadWorking)
+ synchronized (messages) {
+ if (!messages.isEmpty()) notifyWorkers();
+ }
+
+ } else {
+ distributedConnection.listeners--;
+ listeners--;
+ }
+
+ Log.log("Listeners for "+destination+" = "+listeners);
+
+ }
+
}
1.18 +44 -3 spyderMQ/src/java/org/spyderMQ/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- SessionQueue.java 2000/05/25 01:52:05 1.17
+++ SessionQueue.java 2000/05/30 22:10:17 1.18
@@ -21,7 +21,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.17 $
+ * @version $Revision: 1.18 $
*/
public class SessionQueue
{
@@ -32,9 +32,9 @@
//List of messages waiting for acknoledgment
LinkedList messagesWaitingForAck;
//the MessageConsumers linked to this queue
- HashSet subscribers;
+ public HashSet subscribers;
//Number of listening receivers
- int NumListeningSubscribers;
+ public int NumListeningSubscribers;
//My SpySession
SpySession session;
@@ -175,5 +175,46 @@
}
}
+
+
+ public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
+ {
+ Log.log("SessionQueue:
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
+
+ if (session.closed) throw new IllegalStateException("The session is
closed");
+ if (NumListeningSubscribers==0) throw new JMSException("There is no
receiver for this queue !"); //We should catch this error in the JMSServerQueue object
+ if (mes.isOutdated()) return;
+
+ Iterator i=subscribers.iterator();
+ SpyQueueReceiver receiver=null;
+ while (i.hasNext()) {
+ receiver=(SpyQueueReceiver)i.next();
+ if (receiver.listening) break;
+ }
+ if (receiver==null||!receiver.listening) {
+ NumListeningSubscribers=0;
+ Log.log("WARNING: The listeners count was invalid !");
+ throw new JMSException("There is no receiver for this queue
!"); //We should catch this error in the JMSServerQueue object
+ }
+
+
+ synchronized (receiver.messages) {
+
+ if (receiver.messageListener==null) {
+ if (!receiver.waitInReceive) throw new
JMSException("There is no receiver for this queue !"); //We should catch this error in
the JMSServerQueue object
+ receiver.addMessage(mes);
+ receiver.messages.notify();
+ } else {
+ receiver.addMessage(mes);
+ receiver.messageListener.onMessage(mes);
+ }
+
+ }
+
+
+ }
+
+
+
}
1.32 +2 -2 spyderMQ/src/java/org/spyderMQ/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyConnection.java,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -r1.31 -r1.32
--- SpyConnection.java 2000/05/26 22:37:49 1.31
+++ SpyConnection.java 2000/05/30 22:10:18 1.32
@@ -29,7 +29,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.31 $
+ * @version $Revision: 1.32 $
*/
public class SpyConnection
implements Connection, Serializable
@@ -373,7 +373,7 @@
}
}
- protected void failureHandler(Exception e,String reason) throws JMSException
+ public void failureHandler(Exception e,String reason) throws JMSException
{
Log.error(e);
1.5 +2 -1 spyderMQ/src/java/org/spyderMQ/SpyDistributedConnection.java
Index: SpyDistributedConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyDistributedConnection.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyDistributedConnection.java 2000/05/18 02:10:04 1.4
+++ SpyDistributedConnection.java 2000/05/30 22:10:18 1.5
@@ -14,13 +14,14 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyDistributedConnection
implements Serializable
{
private String clientID;
public ConnectionReceiver cr;
+ public transient int listeners;
SpyDistributedConnection(String id,ConnectionReceiver cr_)
{
1.11 +3 -5 spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- SpyQueueSession.java 2000/05/25 01:52:05 1.10
+++ SpyQueueSession.java 2000/05/30 22:10:18 1.11
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public class SpyQueueSession
extends SpySession
@@ -76,7 +76,7 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- //Not implemented yet
+ //Not yet implemented
return createReceiver(queue);
}
@@ -99,9 +99,7 @@
//Called by the ConnectionReceiver object : put a new msg in the receiver's
queue
public void dispatchMessage(Destination dest,SpyMessage mes) throws
JMSException
{
- if (closed) throw new IllegalStateException("The session is closed");
-
- //Not implemented yet
+ //Done in the SessionQueue :)
}
1.21 +2 -2 spyderMQ/src/java/org/spyderMQ/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- SpySession.java 2000/05/26 22:37:49 1.20
+++ SpySession.java 2000/05/30 22:10:18 1.21
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.20 $
+ * @version $Revision: 1.21 $
*/
public class SpySession
implements Runnable, Session
@@ -45,7 +45,7 @@
//The connection object to which this session is linked
protected SpyConnection connection;
//HashMap of SessionQueue by Destination
- HashMap destinations;
+ public HashMap destinations;
//The outgoing message queue
protected LinkedList outgoingQueue;
//The outgoing message queue for messages that have been commited (if the
session is transacted)