User: norbert
Date: 00/05/31 16:22:55
Modified: src/java/org/spydermq ConnectionQueue.java
JMSServerQueue.java SpyConnection.java
Log:
P2P system ( start() and stop() fixed )
Revision Changes Path
1.2 +24 -2 spyderMQ/src/java/org/spydermq/ConnectionQueue.java
Index: ConnectionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/ConnectionQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ConnectionQueue.java 2000/05/31 18:06:40 1.1
+++ ConnectionQueue.java 2000/05/31 23:22:55 1.2
@@ -16,7 +16,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class ConnectionQueue
{
@@ -62,8 +62,10 @@
{
NumListeningSessions+=val;
- Log.log("ConnectionQueue:
changeNumListening(sessions="+NumListeningSessions+")");
+ Log.log("ConnectionQueue: changeNumListening(listening
sessions="+NumListeningSessions+")");
+ if (connection.modeStop) return;
+
try {
if (val==-1&&NumListeningSessions==0) {
@@ -77,4 +79,24 @@
}
}
+
+ synchronized void start() throws JMSException
+ {
+ try {
+ if (NumListeningSessions!=0)
connection.provider.connectionListening(true,destination,connection.distributedConnection);
+ } catch (Exception e) {
+ connection.failureHandler(e,"Cannot contact the JMS server");
+ }
+ }
+
+ synchronized void stop() throws JMSException
+ {
+ try {
+ if (NumListeningSessions!=0)
connection.provider.connectionListening(false,destination,connection.distributedConnection);
+ } catch (Exception e) {
+ connection.failureHandler(e,"Cannot contact the JMS server");
+ }
+ }
+
}
+
1.2 +10 -9 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JMSServerQueue.java 2000/05/31 18:06:40 1.1
+++ JMSServerQueue.java 2000/05/31 23:22:55 1.2
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSServerQueue
{
@@ -275,7 +275,7 @@
if (dc.listeners!=0) break;
}
if (dc==null||dc.listeners==0) {
- listeners=0;
+ //listeners=0;
Log.error("WARNING: The listeners count was
invalid !");
break;
}
@@ -317,18 +317,19 @@
if (mode) {
distributedConnection.listeners++;
- listeners++;
-
- if (listeners==1&&!threadWorking)
- synchronized (messages) {
- if (!messages.isEmpty()) notifyWorkers();
- }
-
+ listeners++;
} else {
distributedConnection.listeners--;
listeners--;
}
+
+ if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
+ synchronized (messages) {
+ if (!messages.isEmpty()) notifyWorkers();
+ }
+ }
+
Log.log("Listeners for "+destination+" = "+listeners);
}
1.2 +17 -3 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyConnection.java 2000/05/31 18:06:43 1.1
+++ SpyConnection.java 2000/05/31 23:22:55 1.2
@@ -29,7 +29,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyConnection
implements Connection, Serializable
@@ -132,7 +132,14 @@
if (!modeStop) return;
modeStop=false;
+ Iterator i=destinations.values().iterator();
+ while (i.hasNext()) {
+ ConnectionQueue cq=(ConnectionQueue)i.next();
+ cq.start();
+ }
+
changeModeStop(modeStop);
+
}
public void stop() throws JMSException
@@ -140,10 +147,17 @@
if (closed) throw new IllegalStateException("The connection is
closed");
if (distributedConnection==null) createReceiver();
- if (modeStop) return;
- modeStop=true;
+ if (modeStop) return;
+ modeStop=true;
+ Iterator i=destinations.values().iterator();
+ while (i.hasNext()) {
+ ConnectionQueue cq=(ConnectionQueue)i.next();
+ cq.stop();
+ }
+
changeModeStop(modeStop);
+
}
public synchronized void close() throws JMSException