User: norbert
Date: 00/05/31 18:14:30
Modified: src/java/org/spydermq JMSServerQueue.java SessionQueue.java
SpyMessageConsumer.java SpyQueueReceiver.java
SpySession.java SpyTopicSubscriber.java
Log:
P2P system : fix for faulty clients
Revision Changes Path
1.5 +9 -6 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- JMSServerQueue.java 2000/06/01 00:12:12 1.4
+++ JMSServerQueue.java 2000/06/01 01:14:29 1.5
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class JMSServerQueue
{
@@ -82,9 +82,12 @@
if (distributedConnection==null) return;
if (distributedConnection.listeners) listeners--;
- if (i==null) subscribers.remove(dc);
- else i.remove();
-
+ if (i==null) {
+ if (subscribers.remove(dc.getClientID())==null)
Log.notice("WARNING: Could not remove "+dc.getClientID());
+ } else {
+ i.remove();
+ }
+
}
}
@@ -196,7 +199,7 @@
try {
dc.cr.receiveMultiple(destination,mes);
} catch (Exception e) {
- Log.error("Cannot deliver those messages to
the client "+dc);
+ Log.error("Cannot deliver those messages to
the client "+dc.getClientID());
Log.error(e);
handleConnectionFailure(dc,i);
}
@@ -227,7 +230,7 @@
private void handleConnectionFailure(SpyDistributedConnection dc,Iterator i)
{
//We should try again :) This behavior should under control of a
Failure-Plugin
- Log.error("I remove this Connection from the subscribers list");
+ Log.error("I remove the Connection "+dc.getClientID()+" from the
subscribers list");
//Call JMSServer.ConnectionClosing(), but ask him not to check my list.
server.connectionClosing(dc,this);
1.2 +6 -10 spyderMQ/src/java/org/spydermq/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SessionQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SessionQueue.java 2000/05/31 18:06:42 1.1
+++ SessionQueue.java 2000/06/01 01:14:29 1.2
@@ -21,7 +21,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SessionQueue
{
@@ -175,12 +175,13 @@
}
}
-
+
+ //Called by the ConnectionReceiver which has just received a message - in the
Queue case only
public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
{
Log.log("SessionQueue:
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
- if (session.closed) throw new IllegalStateException("The session is
closed");
+ if (session.closed) throw new NoReceiverException("The session is
closed");
if (NumListeningSubscribers==0) throw new NoReceiverException("There
are no receivers for this destination !");
if (mes.isOutdated()) return;
@@ -191,9 +192,8 @@
if (receiver.listening) break;
}
if (receiver==null||!receiver.listening) {
- NumListeningSubscribers=0;
- Log.error("WARNING: The listeners count was invalid !");
- throw new NoReceiverException("There are no receivers for this
destination !");
+ Log.error("FIXME: The listeners count was invalid !");
+ throw new NoReceiverException("The listeners count was invalid
!");
}
synchronized (receiver.messages) {
@@ -203,16 +203,12 @@
receiver.addMessage(mes);
receiver.messages.notify();
} else {
- receiver.addMessage(mes);
receiver.messageListener.onMessage(mes);
}
}
-
}
-
-
}
1.2 +13 -6 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyMessageConsumer.java 2000/05/31 18:06:44 1.1
+++ SpyMessageConsumer.java 2000/06/01 01:14:29 1.2
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyMessageConsumer
implements MessageConsumer
@@ -80,30 +80,37 @@
public void setMessageListener(MessageListener listener) throws JMSException
{
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ if (waitInReceive) throw new JMSException("This MessageConsumer is
waiting in receive() !");
//Job is done in the inherited classes
- //The QueueReceiver object need to notify their session / connection /
the broker
- throw new RuntimeException("pure virtual call");
+ //The QueueReceiver object need to notify their session / connection /
the broker
}
public Message receive() throws JMSException
{
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ if (messageListener!=null) throw new JMSException("A message listener
is already registered");
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
- throw new RuntimeException("pure virtual call");
+ return null;
}
public Message receive(long timeOut) throws JMSException
{
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ if (messageListener!=null) throw new JMSException("A message listener
is already registered");
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
- throw new RuntimeException("pure virtual call");
+ return null;
}
public Message receiveNoWait() throws JMSException
{
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ if (messageListener!=null) throw new JMSException("A message listener
is already registered");
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
- throw new RuntimeException("pure virtual call");
+ return null;
}
public synchronized void close() throws JMSException
1.2 +6 -5 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyQueueReceiver.java 2000/05/31 18:06:45 1.1
+++ SpyQueueReceiver.java 2000/06/01 01:14:29 1.2
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
@@ -61,7 +61,7 @@
public Message receive() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.receive();
setListening(true);
@@ -102,7 +102,7 @@
public Message receive(long timeOut) throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.receive(timeOut);
if (timeOut==0) return receive();
long endTime=(new Date()).getTime()+timeOut;
@@ -151,7 +151,8 @@
public Message receiveNoWait() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.receiveNoWait();
+
if (session.modeStop) return null;
return session.connection.queueReceiveNoWait(queue);
@@ -159,7 +160,7 @@
public void setMessageListener(MessageListener listener) throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.setMessageListener(listener);
messageListener=listener;
setListening(listener!=null);
1.2 +3 -2 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpySession.java 2000/05/31 18:06:45 1.1
+++ SpySession.java 2000/06/01 01:14:29 1.2
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpySession
implements Runnable, Session
@@ -262,9 +262,10 @@
}
+ //Called by the ConnectionReceiver which has just received a message
public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
{
- //The job is done in inherited classes
+ //The job is done in inherited classes - in the SPyTopicSession only
throw new RuntimeException("pure virtual call");
}
1.2 +5 -5 spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyTopicSubscriber.java 2000/05/31 18:06:47 1.1
+++ SpyTopicSubscriber.java 2000/06/01 01:14:29 1.2
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -77,7 +77,7 @@
public Message receive() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.receive();
synchronized (messages) {
@@ -108,7 +108,7 @@
public Message receive(long timeOut) throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.receive(timeOut);
if (timeOut==0) return receive();
@@ -147,7 +147,7 @@
public Message receiveNoWait() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.receiveNoWait();
synchronized (messages) {
@@ -161,7 +161,7 @@
public void setMessageListener(MessageListener listener) throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+ super.setMessageListener(listener);
messageListener=listener;