User: norbert
Date: 00/05/24 18:52:06
Modified: src/java/org/spyderMQ ConnectionQueue.java SessionQueue.java
SpyQueueConnection.java SpyQueueReceiver.java
SpyQueueSession.java SpySession.java
SpyTopicSession.java
Log:
More for P2P :
Create a new class ( ConnectionQueue ) which holds the subscribers HashSet
Revision Changes Path
1.3 +14 -1 spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java
Index: ConnectionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ConnectionQueue.java 2000/05/25 01:22:18 1.2
+++ ConnectionQueue.java 2000/05/25 01:52:05 1.3
@@ -16,7 +16,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class ConnectionQueue
{
@@ -58,4 +58,17 @@
return subscribers.size()==0;
}
+ synchronized void changeNumListening(int val)
+ {
+ NumListeningSessions+=val;
+
+ Log.log("ConnectionQueue:
changeNumListening("+NumListeningSessions+")");
+
+ /*if (val==-1&&NumListeningSubscribers==0) {
+
((SpyQueueConnection)session.connection).changeNumListening(val);
+ } else if (val==1&&NumListeningSubscribers==1) {
+
((SpyQueueConnection)session.connection).changeNumListening(val);
+ }*/
+
+ }
}
1.17 +11 -7 spyderMQ/src/java/org/spyderMQ/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- SessionQueue.java 2000/05/25 01:18:33 1.16
+++ SessionQueue.java 2000/05/25 01:52:05 1.17
@@ -10,6 +10,7 @@
import javax.jms.MessageConsumer;
import javax.jms.JMSException;
import javax.jms.Session;
+import javax.jms.Destination;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.Iterator;
@@ -20,12 +21,14 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.16 $
+ * @version $Revision: 1.17 $
*/
public class SessionQueue
{
// Attributes ----------------------------------------------------
+ //My destination
+ Destination destination;
//List of messages waiting for acknoledgment
LinkedList messagesWaitingForAck;
//the MessageConsumers linked to this queue
@@ -37,11 +40,12 @@
// Constructor ---------------------------------------------------
- SessionQueue(SpySession session)
+ SessionQueue(SpySession session,Destination destination)
{
messagesWaitingForAck=new LinkedList();
subscribers=new HashSet();
this.session=session;
+ this.destination=destination;
NumListeningSubscribers=0;
}
@@ -160,14 +164,14 @@
return subscribers.size()==0;
}
- synchronized void changeNumListening(int val)
+ synchronized void changeNumListening(int val) throws JMSException
{
NumListeningSubscribers+=val;
- if (val==-1&&NumListeningSubscribers==0) {
-
((SpyQueueConnection)session.connection).changeNumListening(val);
- } else if (val==1&&NumListeningSubscribers==1) {
-
((SpyQueueConnection)session.connection).changeNumListening(val);
+ if
((val==-1&&NumListeningSubscribers==0)||(val==1&&NumListeningSubscribers==1)) {
+ ConnectionQueue
connectionQueue=(ConnectionQueue)session.connection.destinations.get(destination);
+ if (connectionQueue==null) throw new JMSException("There is NO
ConnectionQueue for this Destination in the SpyConnection !");
+ connectionQueue.changeNumListening(val);
}
}
1.7 +1 -6 spyderMQ/src/java/org/spyderMQ/SpyQueueConnection.java
Index: SpyQueueConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueConnection.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyQueueConnection.java 2000/05/25 01:18:33 1.6
+++ SpyQueueConnection.java 2000/05/25 01:52:05 1.7
@@ -21,7 +21,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class SpyQueueConnection
extends SpyConnection
@@ -100,11 +100,6 @@
failureHandler(e,"Cannot get the Queue from the provider");
return null;
}
- }
-
- synchronized void changeNumListening(int val)
- {
- Log.log("Connection: changeNumListening("+((val>0)?"+)":"-)"));
}
}
1.5 +4 -3 spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyQueueReceiver.java 2000/05/25 01:18:33 1.4
+++ SpyQueueReceiver.java 2000/05/25 01:52:05 1.5
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
@@ -167,11 +167,12 @@
//---
- void setListening(boolean newvalue)
+ void setListening(boolean newvalue) throws JMSException
{
if (newvalue==listening) return;
listening=newvalue;
- ((SpyQueueSession)session).notifyReceiverStopped(this,listening);
+ if (listening) sessionQueue.changeNumListening(1);
+ else sessionQueue.changeNumListening(-1);
}
}
1.10 +2 -9 spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyQueueSession.java 2000/05/25 01:18:34 1.9
+++ SpyQueueSession.java 2000/05/25 01:52:05 1.10
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyQueueSession
extends SpySession
@@ -34,7 +34,7 @@
SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
- super(myConnection,transacted,acknowledgeMode,stop,false);
+ super(myConnection,transacted,acknowledgeMode,stop);
}
// Public --------------------------------------------------------
@@ -146,12 +146,5 @@
thread.notify();
}
}
-
- //One receiver is changing its mode
- synchronized void notifyReceiverStopped(SpyQueueReceiver receiver,boolean mode)
- {
- Log.log("Session:
notifyReceiverStopped(receiver="+receiver+",mode="+mode+")");
- receiver.sessionQueue.changeNumListening(mode?1:-1);
- }
}
1.18 +3 -6 spyderMQ/src/java/org/spyderMQ/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- SpySession.java 2000/05/25 01:18:34 1.17
+++ SpySession.java 2000/05/25 01:52:05 1.18
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.17 $
+ * @version $Revision: 1.18 $
*/
public class SpySession
implements Runnable, Session
@@ -56,12 +56,10 @@
boolean closed;
//This object is the object used to synchronize the session's thread
public Integer thread;
- //Am I linked to a topic
- public boolean isTopic;
// Constructor ---------------------------------------------------
- SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop,
boolean isTopic)
+ SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
{
connection=conn;
transacted=trans;
@@ -73,7 +71,6 @@
messageListener=null;
closed=false;
thread=new Integer(0);
- this.isTopic=isTopic;
//Start one thread for each session
Thread oneThread=new Thread(this);
@@ -384,7 +381,7 @@
synchronized (destinations) {
SessionQueue sub=(SessionQueue)destinations.get(dest);
if (sub==null) {
- sub=new SessionQueue(this);
+ sub=new SessionQueue(this,dest);
sub.addConsumer(who);
HashMap newDestinations=(HashMap)destinations.clone();
newDestinations.put(dest,sub);
1.25 +2 -2 spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -r1.24 -r1.25
--- SpyTopicSession.java 2000/05/24 19:17:18 1.24
+++ SpyTopicSession.java 2000/05/25 01:52:05 1.25
@@ -24,7 +24,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.24 $
+ * @version $Revision: 1.25 $
*/
public class SpyTopicSession
extends SpySession
@@ -35,7 +35,7 @@
SpyTopicSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
- super(myConnection,transacted,acknowledgeMode,stop,true);
+ super(myConnection,transacted,acknowledgeMode,stop);
}
// Public --------------------------------------------------------