User: hiram
Date: 00/12/23 17:55:10
Modified: src/java/org/spydermq SpyTopicSubscriber.java
SpyTopicSession.java SpySession.java
SpyQueueSession.java SpyQueueReceiver.java
SpyMessageConsumer.java SpyConnectionConsumer.java
AcknowledgementRequest.java
Log:
ConnectionConsumer fixes and server synchronization optimizations.
Spyder should now work with the ASF implementation Peter did.
Revision Changes Path
1.10 +1 -1 spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyTopicSubscriber.java 2000/12/23 15:48:16 1.9
+++ SpyTopicSubscriber.java 2000/12/24 01:55:08 1.10
@@ -20,7 +20,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -49,7 +49,7 @@
SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal, String
selector, String durableSubscriptionName)
{
- super(session, (SpyTopic)topic);
+ super(session);
this.topic=topic;
subscription.destination = (SpyDestination)topic;
1.13 +3 -9 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SpyTopicSession.java 2000/12/23 15:48:15 1.12
+++ SpyTopicSession.java 2000/12/24 01:55:08 1.13
@@ -33,7 +33,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class SpyTopicSession
extends SpySession
@@ -73,7 +73,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal,
messageSelector, null);
- addConsumer(topic,sub);
+ addConsumer(sub);
return sub;
}
@@ -83,7 +83,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,false, null,
name);
- addConsumer(topic,sub);
+ addConsumer(sub);
return sub;
@@ -94,7 +94,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal,
messageSelector, name);
- addConsumer(topic,sub);
+ addConsumer(sub);
return sub;
}
@@ -124,11 +124,5 @@
return this;
}
- public void setMessageListener(MessageListener listener) throws JMSException
- {
-
- super.setMessageListener(listener);
- sessionConsumer = new SpyTopicSubscriber(this, null, false,null,null);
- }
}
1.18 +24 -32 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- SpySession.java 2000/12/23 15:48:15 1.17
+++ SpySession.java 2000/12/24 01:55:08 1.18
@@ -34,7 +34,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.17 $
+ * @version $Revision: 1.18 $
*/
abstract public class SpySession
implements Runnable, Session, XASession
@@ -204,7 +204,8 @@
{
if (closed) throw new IllegalStateException("The session is closed");
messageListener = listener;
-
+ sessionConsumer = new SpyMessageConsumer(this);
+
mutex.notifyLock();
}
@@ -380,37 +381,6 @@
return connection.getNewMessageID();
}
-
-
- void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
- {
- Log.log("Session:
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
-
- synchronized (connection) {
- connection.removeConsumer( who );
- }
-
- consumers.remove( who );
- }
-
- void addConsumer(Destination dest, SpyMessageConsumer who) throws JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
-
- Log.log("Session:
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
-
- synchronized (consumers) {
- consumers.add( who );
- }
-
- connection.addConsumer(who);
-
- }
-
-
-
-
-
//called by a MessageProducer object which needs to publish a message
void sendMessage(SpyMessage m) throws JMSException {
if (closed)
@@ -440,5 +410,27 @@
modeStop=newValue;
mutex.notifyLock();
+ }
+
+ void addConsumer(SpyMessageConsumer who) throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The session is closed");
+
+ synchronized (consumers) {
+ consumers.add( who );
+ }
+
+ connection.addConsumer(who);
+
+ }
+
+ void removeConsumer(SpyMessageConsumer who) throws JMSException
+ {
+
+ synchronized (connection) {
+ connection.removeConsumer( who );
+ }
+
+ consumers.remove( who );
}
}
1.11 +2 -8 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- SpyQueueSession.java 2000/12/23 15:48:15 1.10
+++ SpyQueueSession.java 2000/12/24 01:55:08 1.11
@@ -29,7 +29,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public class SpyQueueSession
extends SpySession
@@ -73,7 +73,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,null);
- addConsumer(queue,receiver);
+ addConsumer(receiver);
return receiver;
}
@@ -83,7 +83,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SpyQueueReceiver receiver=new
SpyQueueReceiver(this,queue,messageSelector);
- addConsumer(queue,receiver);
+ addConsumer(receiver);
return receiver;
}
@@ -109,11 +109,5 @@
return this;
}
- public void setMessageListener(MessageListener listener) throws JMSException
- {
-
- super.setMessageListener(listener);
- sessionConsumer = new SpyQueueReceiver(this, null,null);
- }
}
1.9 +3 -3 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- SpyQueueReceiver.java 2000/12/23 15:48:14 1.8
+++ SpyQueueReceiver.java 2000/12/24 01:55:08 1.9
@@ -18,7 +18,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
// Attributes ----------------------------------------------------
@@ -55,10 +55,10 @@
// Constructor ---------------------------------------------------
SpyQueueReceiver(SpyQueueSession session, Queue queue, String selector) {
- super(session, (SpyQueue) queue);
+ super(session);
this.queue = queue;
-
- subscription.durableSubscriptionName = null;
+
+ subscription.destination = (SpyDestination)queue;
subscription.messageSelector = selector;
subscription.durableSubscriptionName = null;
subscription.noLocal = false;
1.9 +13 -23 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- SpyMessageConsumer.java 2000/12/23 15:48:15 1.8
+++ SpyMessageConsumer.java 2000/12/24 01:55:08 1.9
@@ -24,9 +24,9 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
-abstract public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
+public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
//Link to my session
public SpySession session;
@@ -35,25 +35,12 @@
//Am I closed ?
protected boolean closed;
// The subscription structure should be fill out by the decendent
- Subscription subscription = new Subscription();
+ public Subscription subscription = new Subscription();
//List of Pending messages (not yet delivered)
LinkedList messages;
- //The destination this consumer is getting messages from
- SpyDestination destination;
-
- // Constructor ---------------------------------------------------
-
- SpyMessageConsumer(SpySession s, SpyDestination dest) {
- session = s;
- destination = dest;
- messageListener = null;
- closed = false;
- messages = new LinkedList();
- }
-
// Public --------------------------------------------------------
public String getMessageSelector() throws JMSException {
@@ -86,8 +73,6 @@
throw new IllegalStateException("The MessageConsumer is
closed");
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
- if (destination == null)
- throw new JMSException("No assigned destination.");
Log.log("Subscription="+subscription);
subscription.receiving = true;
@@ -128,8 +113,6 @@
throw new IllegalStateException("The MessageConsumer is
closed");
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
- if (destination == null)
- throw new JMSException("No assigned destination.");
if (timeOut == 0)
return receive();
@@ -183,8 +166,6 @@
throw new IllegalStateException("The MessageConsumer is
closed");
if (messageListener != null)
throw new JMSException("A message listener is already
registered");
- if (destination == null)
- throw new JMSException("No assigned destination.");
subscription.receiving = true;
try {
@@ -214,8 +195,8 @@
if (closed)
return;
- if (destination != null)
- session.removeConsumer(destination, this);
+ if (subscription != null)
+ session.removeConsumer(this);
if ( subscription.receiving && messageListener == null) {
//A consumer could be waiting in receive()
@@ -340,5 +321,14 @@
public Subscription getSubscription() {
return subscription;
+ }
+
+ // Constructor ---------------------------------------------------
+
+ SpyMessageConsumer(SpySession s) {
+ session = s;
+ messageListener = null;
+ closed = false;
+ messages = new LinkedList();
}
}
1.3 +23 -28 spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java
Index: SpyConnectionConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyConnectionConsumer.java 2000/12/23 15:48:15 1.2
+++ SpyConnectionConsumer.java 2000/12/24 01:55:09 1.3
@@ -19,7 +19,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer,
SpyConsumer {
@@ -36,7 +36,7 @@
// Is the ConnectionConsumer closed?
boolean closed;
// The subscription info the consumer
- Subscription subscription;
+ Subscription subscription = new Subscription();
/**
* SpyConnectionConsumer constructor comment.
@@ -53,17 +53,20 @@
subscription.messageSelector = messageSelector;
subscription.durableSubscriptionName = null;
subscription.noLocal = false;
+ subscription.listening = true;
+ subscription.receiving = false;
connection.addConsumer(this);
-
+ connection.listenerChange(subscription.subscriptionId,true);
+
}
public void addMessage(SpyMessage mes) throws JMSException {
+ Log.log(""+this+"->addMessage(mes="+mes+")");
queue.addLast(mes);
}
-
/**
* close method comment.
*/
@@ -80,41 +83,29 @@
public javax.jms.ServerSessionPool getServerSessionPool() throws
javax.jms.JMSException {
return serverSessionPool;
}
-
- public boolean isListening() {
- return true;
- }
-
- public boolean isReceiving() {
- return false;
- }
-
- public void processMessages() throws JMSException {
- ServerSession serverSession = serverSessionPool.getServerSession();
- SpySession spySession = (SpySession) serverSession.getSession();
- if (spySession.sessionConsumer == null)
- throw new JMSException("Session did not have a set
MessageListner");
+
+ public void processMessages() throws JMSException {
- int loadCounter = 0;
+ Log.log(""+this+"->processMessages()");
Iterator iter = queue.iterator();
while (iter.hasNext()) {
-
- loadCounter++;
SpyMessage message = (SpyMessage) iter.next();
- spySession.sessionConsumer.addMessage(message);
- if (loadCounter >= maxMessages) {
+ ServerSession serverSession =
serverSessionPool.getServerSession();
+ SpySession spySession = (SpySession)
serverSession.getSession();
- serverSession.start();
+ if (spySession.sessionConsumer == null) {
+ Log.log(""+this+" Session did not have a set
MessageListner");
+ throw new JMSException("Session did not have a set
MessageListner");
+ }
- serverSession = serverSessionPool.getServerSession();
- spySession = (SpySession) serverSession.getSession();
+ spySession.sessionConsumer.addMessage(message);
+ spySession.sessionConsumer.subscription = subscription;
- if (spySession.sessionConsumer == null)
- throw new JMSException("Session did not have a
set MessageListner");
- }
+ Log.log(""+this+" Starting the ServerSession.");
+ serverSession.start();
}
@@ -122,5 +113,9 @@
public Subscription getSubscription() {
return subscription;
+ }
+
+ public String toString() {
+ return "SpyConnectionConsumer:"+destination;
}
}
1.2 +4 -0 spyderMQ/src/java/org/spydermq/AcknowledgementRequest.java
Index: AcknowledgementRequest.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/AcknowledgementRequest.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- AcknowledgementRequest.java 2000/12/23 15:48:16 1.1
+++ AcknowledgementRequest.java 2000/12/24 01:55:09 1.2
@@ -17,7 +17,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class AcknowledgementRequest
implements java.io.Serializable
@@ -40,5 +40,9 @@
public int hashCode() {
return messageID.hashCode();
+ }
+
+ public String toString() {
+ return
"AcknowledgementRequest:"+(isAck?"ACK":"NACK")+","+destination+","+messageID;
}
}