User: hiram
Date: 00/12/23 18:34:46
Modified: src/java/org/spydermq SpyMessageConsumer.java
SpyQueueReceiver.java SpySession.java
SpyConnectionConsumer.java
Log:
More ConnectionConsumer Fixes
Revision Changes Path
1.10 +1 -2 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyMessageConsumer.java 2000/12/24 01:55:08 1.9
+++ SpyMessageConsumer.java 2000/12/24 02:34:46 1.10
@@ -24,7 +24,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
@@ -63,6 +63,7 @@
if (subscription.receiving)
throw new JMSException("This MessageConsumer is waiting in
receive() !");
+ subscription.listening = listener!=null;
messageListener = listener;
}
@@ -74,9 +75,7 @@
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
- Log.log("Subscription="+subscription);
subscription.receiving = true;
- Log.log("Subscription assignment finished");
if ( this instanceof SpyQueueReceiver ||
subscription.durableSubscriptionName!=null )
session.connection.receive(subscription, 0);
1.10 +4 -11 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyQueueReceiver.java 2000/12/24 01:55:08 1.9
+++ SpyQueueReceiver.java 2000/12/24 02:34:46 1.10
@@ -18,7 +18,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
// Attributes ----------------------------------------------------
@@ -36,20 +36,13 @@
}
-
public void setMessageListener(MessageListener listener) throws JMSException {
+
+ boolean change = (listener!=null) != subscription.listening;
super.setMessageListener(listener);
- setListening(listener != null);
- }
-
- //---
- void setListening(boolean newvalue) throws JMSException {
- if (newvalue == subscription.listening)
- return;
- subscription.listening = newvalue;
-
- if (queue != null)
+ if (change && queue != null)
session.connection.listenerChange(subscription.subscriptionId,
subscription.listening);
+
}
// Constructor ---------------------------------------------------
1.19 +5 -7 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- SpySession.java 2000/12/24 01:55:08 1.18
+++ SpySession.java 2000/12/24 02:34:46 1.19
@@ -34,7 +34,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.18 $
+ * @version $Revision: 1.19 $
*/
abstract public class SpySession
implements Runnable, Session, XASession
@@ -45,8 +45,7 @@
protected boolean transacted;
//What is the type of acknowledgement ?
protected int acknowledgeMode;
- //The messageListener for this session
- private MessageListener messageListener;
+
//The connection object to which this session is linked
public SpyConnection connection;
// This consumer is the consumer that receives messages for the MessageListener
@@ -82,7 +81,6 @@
spyXAResource = new SpyXAResource(this);
mutex = new Mutex();
- messageListener=null;
closed=false;
consumers = new HashSet();
@@ -196,16 +194,16 @@
public MessageListener getMessageListener() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
- return messageListener;
+ if( sessionConsumer == null )
+ return null;
+ return sessionConsumer.getMessageListener();
}
public void setMessageListener(MessageListener listener) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
- messageListener = listener;
sessionConsumer = new SpyMessageConsumer(this);
-
+ sessionConsumer.setMessageListener(listener);
mutex.notifyLock();
}
1.4 +17 -14 spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java
Index: SpyConnectionConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyConnectionConsumer.java 2000/12/24 01:55:09 1.3
+++ SpyConnectionConsumer.java 2000/12/24 02:34:46 1.4
@@ -19,7 +19,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer,
SpyConsumer {
@@ -89,24 +89,27 @@
Log.log(""+this+"->processMessages()");
- Iterator iter = queue.iterator();
- while (iter.hasNext()) {
- SpyMessage message = (SpyMessage) iter.next();
-
- ServerSession serverSession =
serverSessionPool.getServerSession();
- SpySession spySession = (SpySession)
serverSession.getSession();
-
- if (spySession.sessionConsumer == null) {
- Log.log(""+this+" Session did not have a set
MessageListner");
- throw new JMSException("Session did not have a set
MessageListner");
- }
+ synchronized (queue) {
+ Iterator iter = queue.iterator();
+ while (iter.hasNext()) {
+ SpyMessage message = (SpyMessage) iter.next();
+ iter.remove();
+
+ ServerSession serverSession =
serverSessionPool.getServerSession();
+ SpySession spySession = (SpySession)
serverSession.getSession();
+
+ if (spySession.sessionConsumer == null) {
+ Log.log(""+this+" Session did not have a set
MessageListner");
+ throw new JMSException("Session did not have a
set MessageListner");
+ }
- spySession.sessionConsumer.addMessage(message);
- spySession.sessionConsumer.subscription = subscription;
+ spySession.sessionConsumer.addMessage(message);
+ spySession.sessionConsumer.subscription = subscription;
- Log.log(""+this+" Starting the ServerSession.");
- serverSession.start();
+ Log.log(""+this+" Starting the ServerSession.");
+ serverSession.start();
+ }
}
}