User: hiram
Date: 00/12/23 17:55:08
Modified: src/java/org/spydermq/server SharedQueue.java
PersistenceManager.java JMSServer.java
JMSDestination.java ExclusiveQueue.java
ClientConsumer.java
Added: src/java/org/spydermq/server BasicQueue.java
Log:
ConnectionConsumer fixes and server synchronization optimizations.
Spyder should now work with the ASF implementation Peter did.
Revision Changes Path
1.2 +3 -62 spyderMQ/src/java/org/spydermq/server/SharedQueue.java
Index: SharedQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SharedQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SharedQueue.java 2000/12/23 15:48:25 1.1
+++ SharedQueue.java 2000/12/24 01:55:06 1.2
@@ -26,9 +26,9 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
-public class SharedQueue implements Task, AbstractQueue {
+public class SharedQueue extends BasicQueue {
//List of Pending messages
private TreeSet messages;
@@ -40,77 +40,18 @@
// Constructor ---------------------------------------------------
SharedQueue(JMSServer server) throws JMSException
{
- this.server=server;
+ super( server );
consumers=new LinkedList();
messages=new TreeSet();
}
- public void addMessage(SpyMessage mes, Long txId) throws JMSException
- {
-
- // This task gets run to make the message visible in the queue.
- class AddMessagePostCommitTask implements Runnable {
- SpyMessage message;
-
- AddMessagePostCommitTask(SpyMessage m) {
- message = m;
- }
-
- public void run() {
- synchronized (messages)
- {
-
- //Add the message to the queue
- messages.add(message);
- notifyMessageAvailable();
-
- }
- }
- }
-
- // The message gets added to the queue after the transaction
- // commits (if the message was transacted)
- Runnable task = new AddMessagePostCommitTask(mes);
- if( txId == null ) {
- task.run();
- } else {
- server.persistenceManager.addPostCommitTask(txId, task);
- }
-
- }
-
- // Package protected ---------------------------------------------
- public void addConsumer(ClientConsumer consumer) throws JMSException
- {
- //We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
- synchronized (consumers) {
- consumers.add(consumer);
- }
- }
-
- public void notifyMessageAvailable() {
-
- synchronized (server.taskQueue) {
- server.taskQueue.addLast(this);
- server.taskQueue.notify();
- }
-
- }
-
- public void removeConsumer(ClientConsumer consumer) throws JMSException
- {
- synchronized (consumers) {
- consumers.remove(consumer);
- }
- }
// This will dispatch messages in the queue the the ClientConsumers
- synchronized public void run() throws JMSException
+ public void run() throws JMSException
{
SpyMessage[] job;
synchronized (messages) {
-
if( messages.size() == 0 )
return;
1.4 +0 -0 spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- PersistenceManager.java 2000/12/23 15:48:24 1.3
+++ PersistenceManager.java 2000/12/24 01:55:06 1.4
@@ -26,7 +26,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class PersistenceManager {
1.6 +2 -2 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- JMSServer.java 2000/12/23 15:48:24 1.5
+++ JMSServer.java 2000/12/24 01:55:07 1.6
@@ -27,7 +27,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
@@ -497,10 +497,10 @@
}
- public void restoreMessage(SpyMessage message, String queueId) throws
JMSException
+ public void restoreMessage(SpyMessage message, String queueId)
{
JMSDestination
queue=(JMSDestination)messageQueue.get(message.jmsDestination);
- if (queue==null) throw new JMSException("This destination does not
exist !");
+ if (queue==null) throw new RuntimeException("This destination does not
exist!");
//Add the message to the queue
queue.restoreMessage(message, queueId);
}
1.2 +3 -6 spyderMQ/src/java/org/spydermq/server/JMSDestination.java
Index: JMSDestination.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSDestination.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JMSDestination.java 2000/12/23 15:48:25 1.1
+++ JMSDestination.java 2000/12/24 01:55:07 1.2
@@ -26,7 +26,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSDestination {
@@ -128,13 +128,10 @@
}
// Package protected ---------------------------------------------
- ExclusiveQueue getExclusiveQueue(String queue) throws JMSException {
+ ExclusiveQueue getExclusiveQueue(String queue) {
- ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
- if( eq == null )
- throw new JMSException("That destination queue does not
exist");
+ return (ExclusiveQueue)exclusiveQueues.get( queue );
- return eq;
}
// Package protected ---------------------------------------------
@@ -169,7 +166,7 @@
}
//Used to put a message that was added previously to the queue, back in the
queue
- public void restoreMessage(SpyMessage mes, String queueId) throws JMSException
+ public void restoreMessage(SpyMessage mes, String queueId)
{
Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
ExclusiveQueue eq = getExclusiveQueue(queueId);
1.2 +5 -119 spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java
Index: ExclusiveQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ExclusiveQueue.java 2000/12/23 15:48:25 1.1
+++ ExclusiveQueue.java 2000/12/24 01:55:07 1.2
@@ -28,143 +28,29 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
-public class ExclusiveQueue implements Task, AbstractQueue {
+public class ExclusiveQueue extends BasicQueue {
- //List of messages waiting to be dispatched
- TreeSet messages = new TreeSet();
- //The JMSServer object
- JMSServer server;
- //DistributedConnection objs that have "registered" to this Destination
- private LinkedList consumers = new LinkedList();
//The queueId needed to identify this queue with the persistence manager.
String queueId;
-
-
- //Used to put a message that was added previously to the queue, back in the
queue
- public void restoreMessage(SpyMessage mes)
- {
- //restore a message to the message list...
- synchronized (messages) {
- messages.add(mes);
- }
- notifyMessageAvailable();
- }
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
- Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
- // This task gets run to make the message visible in the queue.
- class AddMessagePostCommitTask implements Runnable {
- SpyMessage message;
-
- AddMessagePostCommitTask(SpyMessage m) {
- message = m;
- }
-
- public void run() {
- //restore a message to the message list...
- synchronized (messages) {
- messages.add(message);
- }
- notifyMessageAvailable();
- }
- }
-
// Persist the message if it was persistent
if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
server.persistenceManager.add(queueId, mes, txId);
-
- // The message gets added to the queue after the transaction
- // commits (if the message was transacted)
- Runnable task = new AddMessagePostCommitTask(mes);
- if( txId == null ) {
- task.run();
- } else {
- server.persistenceManager.addPostCommitTask(txId, task);
- }
+
+ super.addMessage(mes, txId);
}
// Constructor ---------------------------------------------------
public ExclusiveQueue(JMSServer server, String queueId) throws JMSException
- {
-
- this.server=server;
- this.queueId = queueId;
-
- }
-
- // synchrnozed so no message dispatching occurs while we add a consumer
- synchronized public void addConsumer(ClientConsumer consumer) throws
JMSException
- {
- //We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
- synchronized (consumers) {
- consumers.add(consumer);
- }
- }
-
- public SpyMessage[] browse(String selector) throws JMSException {
-
- if( selector == null ) {
- SpyMessage list[];
- synchronized (messages) {
- list = new SpyMessage[messages.size()];
- list = (SpyMessage [])messages.toArray(list);
- }
- return list;
- } else {
- Selector s = new Selector( selector );
- LinkedList selection=new LinkedList();
-
- synchronized (messages) {
- Iterator i = messages.iterator();
- while( i.hasNext() ) {
- SpyMessage m = (SpyMessage)i.next();
- if( s.test(m) )
- selection.add(m);
- }
- }
-
- SpyMessage list[];
- list = new SpyMessage[selection.size()];
- list = (SpyMessage [])selection.toArray(list);
- return list;
- }
- }
-
- public void notifyMessageAvailable() {
-
- Log.log(""+this+"->notifyMessageAvailable()");
-
- synchronized (server.taskQueue) {
- server.taskQueue.addLast(this);
- server.taskQueue.notify();
- }
-
- }
-
- //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
- public SpyMessage receiveMessage() throws JMSException
- {
- synchronized (messages) {
- if (messages.size()==0)
- return null;
-
- SpyMessage m = (SpyMessage)messages.first();
- messages.remove(m);
-
- return m;
- }
- }
-
- public void removeConsumer(ClientConsumer consumer) throws JMSException
{
- synchronized (consumers) {
- consumers.remove(consumer);
- }
+ super(server);
+ this.queueId = queueId;
}
// Iterate over the consumers asking them to take messages until they stop
1.2 +72 -58 spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ClientConsumer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ClientConsumer.java 2000/12/23 15:48:25 1.1
+++ ClientConsumer.java 2000/12/24 01:55:07 1.2
@@ -27,7 +27,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class ClientConsumer implements Task {
@@ -67,15 +67,14 @@
RestoreMessageTask(SpyMessage m,int subscriptionId) { message
= m; this.subscriptionId=subscriptionId; }
public void run() {
Log.log("Restoring message: " + message.jmsMessageID);
- String queueId = JMSDestination.DEFAULT_QUEUE_ID;
+ String queueId;
if( message.jmsDestination instanceof SpyTopic ) {
// Still need to implement
- //queueId
- }
- try {
- server.restoreMessage(message,queueId);
- } catch (JMSException ignore ) {
+ queueId = null;
+ } else {
+ queueId = JMSDestination.DEFAULT_QUEUE_ID;
}
+ server.restoreMessage(message,queueId);
}
}
@@ -124,14 +123,13 @@
Subscription s = (Subscription)subs.next();
if( s.accepts( message, false ) ) {
+ ReceiveRequest r = new ReceiveRequest();
+ r.message = message;
+
synchronized (messages) {
-
- ReceiveRequest r = new ReceiveRequest();
- r.message = message;
-
messages.add(r);
-
}
+
return;
}
}
@@ -146,18 +144,24 @@
req.dc = dc;
synchronized (subscriptions ) {
-
- subscriptions.put(new Integer(req.subscriptionId), req );
+
+ HashMap subscriptionsClone = (HashMap)subscriptions.clone();
+ subscriptionsClone.put(new Integer(req.subscriptionId), req );
+ subscriptions = subscriptionsClone;
LinkedList ll = (LinkedList)destinationSubscriptions.get(
req.destination );
if( ll == null ) {
- ll = new LinkedList();
-
- destinationSubscriptions.put(req.destination, ll );
-
+
JMSDestination
queue=(JMSDestination)server.getJMSDestination(req.destination);
if (queue==null) throw new JMSException("This
destination does not exist !");
+ ll = new LinkedList();
+ ll.add( req );
+
+ HashMap destinationSubscriptionsClone =
(HashMap)destinationSubscriptions.clone();
+ destinationSubscriptionsClone.put(req.destination, ll
);
+ destinationSubscriptions =
destinationSubscriptionsClone;
+
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
//
queue.addExclusiveConsumer(dc.getClientID(), this);
@@ -167,9 +171,14 @@
} else {
queue.addExclusiveConsumer(queue.DEFAULT_QUEUE_ID, this);
}
+ } else {
+ LinkedList llClone = (LinkedList)ll.clone();
+ llClone.add( req );
+
+ HashMap destinationSubscriptionsClone =
(HashMap)destinationSubscriptions.clone();
+ destinationSubscriptions.put(req.destination, llClone);
+ destinationSubscriptions =
destinationSubscriptionsClone;
}
-
- ll.add( req );
}
}
@@ -191,7 +200,7 @@
}
synchronized (unacknowledgedMessages) {
- Iterator i = unacknowledgedMessages.keySet().iterator();
+ Iterator i =
((HashMap)unacknowledgedMessages.clone()).keySet().iterator();
while( i.hasNext() ) {
AcknowledgementRequest item =
(AcknowledgementRequest)i.next();
@@ -292,17 +301,26 @@
Subscription req;
synchronized (subscriptions ) {
- req = (Subscription)subscriptions.remove(new
Integer(subscriptionId));
-
+ HashMap subscriptionsClone = (HashMap)subscriptions.clone();
+ req = (Subscription)subscriptionsClone.remove(new
Integer(subscriptionId));
+ subscriptions = subscriptionsClone;
+
if( req == null )
throw new JMSException("The subscription had not been
previously registered");
+
LinkedList ll = (LinkedList)destinationSubscriptions.get(
req.destination );
if( ll == null )
throw new JMSException("The subscription was not
registered with the destination");
-
- ll.remove( req );
- if( ll.size() != 0 )
+
+ LinkedList llClone = (LinkedList)ll.clone();
+ llClone.remove( req );
+
+ HashMap destinationSubscriptionsClone =
(HashMap)destinationSubscriptions.clone();
+ destinationSubscriptionsClone.put( req.destination, llClone );
+ destinationSubscriptions = destinationSubscriptionsClone;
+
+ if( llClone.size() != 0 )
return;
// There is no subscriber for the destination at this point
@@ -358,47 +376,43 @@
while( i.hasNext() ) {
SpyMessage message = (SpyMessage)i.next();
+
+ LinkedList l = (LinkedList)destinationSubscriptions.get(
message.getJMSDestination() );
+ if( l == null ) return false;
- synchronized (subscriptions) {
+ Iterator subs = l.iterator();
+ while( subs.hasNext() ) {
- LinkedList l =
(LinkedList)destinationSubscriptions.get( message.getJMSDestination() );
- if( l == null )
- throw new JMSException("No subscription found
for that destination.");
+ Subscription s = (Subscription)subs.next();
+ if( s.accepts( message, true ) ) {
- Iterator subs = l.iterator();
-
- while( subs.hasNext() ) {
+ s.receiving = false;
+ i.remove();
- Subscription s = (Subscription)subs.next();
- if( s.accepts( message, true ) ) {
-
- s.receiving = false;
- i.remove();
-
- synchronized (messages) {
-
- ReceiveRequest r = new
ReceiveRequest();
- r.message = message;
- r.subscriptionId = new
Integer(s.subscriptionId);
-
- messages.add(r);
-
- AcknowledgementRequest ack =
new AcknowledgementRequest();
- ack.destination =
message.getJMSDestination();
- ack.messageID =
message.getJMSMessageID();
- ack.subscriberId =
s.subscriptionId;
- ack.isAck = false;
-
unacknowledgedMessages.put(ack, message);
-
- }
- notifyMessageAvailable();
+ ReceiveRequest r = new ReceiveRequest();
+ r.message = message;
+ r.subscriptionId = new
Integer(s.subscriptionId);
+
+ synchronized (messages) {
+ messages.add(r);
+ }
+
+ AcknowledgementRequest ack = new
AcknowledgementRequest();
+ ack.destination = message.getJMSDestination();
+ ack.messageID = message.getJMSMessageID();
+ ack.subscriberId = s.subscriptionId;
+ ack.isAck = false;
- return true;
-
+ synchronized (unacknowledgedMessages) {
+ unacknowledgedMessages.put(ack,
message);
}
+
+ notifyMessageAvailable();
+ return true;
+
}
-
}
+
}
return false;
1.1 spyderMQ/src/java/org/spydermq/server/BasicQueue.java
Index: BasicQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import org.spydermq.*;
import org.spydermq.persistence.SpyMessageLog;
import org.spydermq.selectors.Selector;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
/**
* This class represents a queue which provides it's messages
* exclusivly to one consumer at a time.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
abstract public class BasicQueue implements Task, AbstractQueue {
//List of messages waiting to be dispatched
TreeSet messages = new TreeSet();
//The JMSServer object
JMSServer server;
//DistributedConnection objs that have "registered" to this Destination
LinkedList consumers = new LinkedList();
// Constructor ---------------------------------------------------
public BasicQueue(JMSServer server) throws JMSException
{
this.server=server;
}
//Used to put a message that was added previously to the queue, back in the
queue
public void restoreMessage(SpyMessage mes)
{
//restore a message to the message list...
synchronized (messages) {
messages.add(mes);
}
notifyMessageAvailable();
}
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
// This task gets run to make the message visible in the queue.
class AddMessagePostCommitTask implements Runnable {
SpyMessage message;
AddMessagePostCommitTask(SpyMessage m) {
message = m;
}
public void run() {
//restore a message to the message list...
synchronized (messages) {
messages.add(message);
}
notifyMessageAvailable();
}
}
// The message gets added to the queue after the transaction
// commits (if the message was transacted)
Runnable task = new AddMessagePostCommitTask(mes);
if( txId == null ) {
task.run();
} else {
server.persistenceManager.addPostCommitTask(txId, task);
}
}
//
public void addConsumer(ClientConsumer consumer) throws JMSException
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (consumers) {
LinkedList consumersClone = (LinkedList)consumers.clone();
consumersClone.add(consumer);
consumers = consumersClone;
}
}
public SpyMessage[] browse(String selector) throws JMSException {
if( selector == null ) {
SpyMessage list[];
synchronized (messages) {
list = new SpyMessage[messages.size()];
list = (SpyMessage [])messages.toArray(list);
}
return list;
} else {
Selector s = new Selector( selector );
LinkedList selection=new LinkedList();
synchronized (messages) {
Iterator i = messages.iterator();
while( i.hasNext() ) {
SpyMessage m = (SpyMessage)i.next();
if( s.test(m) )
selection.add(m);
}
}
SpyMessage list[];
list = new SpyMessage[selection.size()];
list = (SpyMessage [])selection.toArray(list);
return list;
}
}
public void notifyMessageAvailable() {
Log.log(""+this+"->notifyMessageAvailable()");
synchronized (server.taskQueue) {
server.taskQueue.addLast(this);
server.taskQueue.notify();
}
}
//Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
public SpyMessage receiveMessage() throws JMSException
{
synchronized (messages) {
if (messages.size()==0)
return null;
SpyMessage m = (SpyMessage)messages.first();
messages.remove(m);
return m;
}
}
public void removeConsumer(ClientConsumer consumer) throws JMSException
{
synchronized (consumers) {
LinkedList consumersClone = (LinkedList)consumers.clone();
consumersClone.remove(consumer);
consumers = consumersClone;
}
}
}