User: hiram
Date: 00/12/23 07:48:27
Modified: src/java/org/spydermq/server JMSServer.java
PersistenceManager.java SpyderMQService.java
StartServer.java
Added: src/java/org/spydermq/server AbstractQueue.java
ClientConsumer.java ExclusiveQueue.java
JMSDestination.java SharedQueue.java Task.java
Removed: src/java/org/spydermq/server JMSServerQueue.java
JMSServerQueueReceiver.java
Log:
These changes were done to add the following features:
The selector is now evaluated at the server side.
The infrastructure has been laid for durable topic subscriptions.
The QueueBrowser has been implemented.
Queues now can have a Selector.
Revision Changes Path
1.5 +226 -183 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- JMSServer.java 2000/12/21 22:33:59 1.4
+++ JMSServer.java 2000/12/23 15:48:24 1.5
@@ -19,6 +19,7 @@
import org.spydermq.*;
import org.spydermq.security.SecurityManager;
+import org.spydermq.xml.XElement;
/**
* This class implements the JMS provider
@@ -26,7 +27,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
@@ -65,10 +66,15 @@
*/
private boolean stopped = true;
+
+ //The list of ClientConsumers hased by SpyDistributedConnections
+ HashMap clientConsumers = new HashMap();
+ XElement serverConfig;
+
/////////////////////////////////////////////////////////////////////
// Constructors
/////////////////////////////////////////////////////////////////////
- public JMSServer(SecurityManager securityManager)
+ public JMSServer()
{
taskQueue=new LinkedList();
@@ -76,7 +82,7 @@
for(int i=0;i<NB_THREADS;i++)
{
- Thread oneThread=new Thread(this);
+ Thread oneThread=new Thread(this,"JMSServer");
oneThread.setDaemon(true);
oneThread.setName(new Integer(i).toString());
oneThread.start();
@@ -84,8 +90,6 @@
lastID=1;
lastTemporaryTopic=1;
- this.securityManager=securityManager;
-
}
@@ -108,51 +112,50 @@
//This is a correct threading system, but this is not ideal...
//We should let threads cycle through the JMSServerQueue list, and
synchronized on the queue they are working on.
public void run() {
- while (alive) {
- JMSServerQueue queue = null;
- this.stopped = false;
+ while (alive) {
- //Wait (and sleep) until it can find something to do
- synchronized (taskQueue) {
- while (queue == null && alive) {
-
- // size() is O(1) in LinkedList...
- int size=taskQueue.size();
- if (size!=0) {
-
- //<DEBUG>
- queue =
(JMSServerQueue)taskQueue.removeFirst();
-
//queue=(JMSServerQueue)taskQueue.getFirst();
- //</DEBUG>
-
- //One other thread can start
working on the task queue...
- if (size > 1) {
- taskQueue.notify();
- }
- } else {
- try {
- //Log.log("I'm going
to bed...");
- taskQueue.wait(5000);
- //Log.log("I wake up");
- } catch (InterruptedException
e) {
- }
+ Task task = null;
+ this.stopped = false;
+
+ //Wait (and sleep) until it can find something to do
+ synchronized (taskQueue) {
+ while (task == null && alive) {
+
+ // size() is O(1) in LinkedList...
+ int size = taskQueue.size();
+ if (taskQueue.size() != 0) {
+
+ task = (Task) taskQueue.removeFirst();
+
+ //One other thread can start working
on the task queue...
+ if (size > 1) {
+ taskQueue.notify();
}
- }
- }
-
- if (alive) {
- //Ask the queue to do its job
- try {
- queue.doMyJob();
- } catch (JMSException e) {
- Log.error(e);
+ } else {
+
+ try {
+ taskQueue.wait(5000);
+ } catch (InterruptedException e) {
+ }
}
+
}
}
- Log.log("JMS service stopped.");
- this.stopped = true;
+
+ if (!alive) break;
+
+ //Ask the queue to do its job
+ try {
+ task.run();
+ } catch (JMSException e) {
+ Log.error(e);
+ }
+ }
+
+ Log.log("JMS service stopped.");
+ this.stopped = true;
}
public void stopServer() {
@@ -167,7 +170,7 @@
SpyTopic newTopic=new SpyTopic(name);
if (messageQueue.containsKey(newTopic)) throw new JMSException("This
topic already exists !");
- JMSServerQueue queue=new JMSServerQueue(newTopic,null,this);
+ JMSDestination queue=new JMSDestination(newTopic,null,this);
//Add this new JMSServerQueue to the list
synchronized (messageQueue) {
@@ -187,7 +190,7 @@
SpyQueue newQueue=new SpyQueue(name);
if (messageQueue.containsKey(newQueue)) throw new JMSException("This
queue already exists !");
- JMSServerQueue queue=new JMSServerQueue(newQueue,null,this);
+ JMSDestination queue=new JMSDestination(newQueue,null,this);
//Add this new JMSServerQueue to the list
synchronized (messageQueue) {
@@ -198,10 +201,6 @@
return newQueue;
}
-
- // -----------------------------------------
- // Callbacks for the invocation layer ------
- // -----------------------------------------
//Get a new ClientID for a connection
public String getID()
@@ -220,33 +219,18 @@
return ID;
}
- public synchronized SpyTopic createTopic(String name) throws JMSException
- {
- Log.log("createTopic("+name+")");
-
- SpyTopic newTopic=new SpyTopic(name);
- if (!messageQueue.containsKey(newTopic)) throw new JMSException("This
destination does not exist !");
- return newTopic;
- }
-
- public synchronized SpyQueue createQueue(String name) throws JMSException
- {
- Log.log("createQueue("+name+")");
-
- SpyQueue newQueue=new SpyQueue(name);
- if (!messageQueue.containsKey(newQueue)) throw new JMSException("This
destination does not exist !");
- return newQueue;
- }
-
public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection
dc) throws JMSException
{
SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new
Integer(lastTemporaryTopic++).toString()),dc);
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
synchronized (messageQueue) {
- JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
+
+ JMSDestination queue = new
JMSDestination(topic,ClientConsumer,this);
HashMap newMap=(HashMap)messageQueue.clone();
newMap.put(topic,queue);
messageQueue=newMap;
+
}
return topic;
@@ -254,157 +238,49 @@
public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection
dc) throws JMSException
{
+
SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new
Integer(lastTemporaryQueue++).toString()),dc);
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
synchronized (messageQueue) {
- JMSServerQueue sessionQueue=new
JMSServerQueue(newQueue,dc,this);
+
+ JMSDestination queue = new
JMSDestination(newQueue,ClientConsumer,this);
HashMap newMap=(HashMap)messageQueue.clone();
- newMap.put(newQueue,sessionQueue);
+ newMap.put(newQueue,queue);
messageQueue=newMap;
+
}
return newQueue;
}
-
- //A connection is closing [error or notification]
- public synchronized void connectionClosing(SpyDistributedConnection
dc,JMSServerQueue noCheck)
- {
- Log.log("connectionClosing(dc="+dc+",noCheck="+noCheck+")");
-
- if (dc==null) return;
-
- //unregister its clientID
- if (dc.getClientID()!=null)
- securityManager.removeID(dc.getClientID());
-
- //Remove the connection from the subscribers list
- synchronized (messageQueue) {
-
- HashMap newMap=null;
- Iterator i=messageQueue.values().iterator();
- boolean modified=false; //don't waste memory
-
- while (i.hasNext()) {
-
- JMSServerQueue sq=(JMSServerQueue)i.next();
-
- if (dc.equals(sq.temporaryDestination)) {
- if (!modified)
newMap=(HashMap)messageQueue.clone();
- newMap.remove(sq.destination);
- modified=true;
- } else {
- if (sq==noCheck) continue;
- sq.connectionClosing(dc);
- }
-
- }
-
- if (modified) messageQueue=newMap;
- }
-
- }
-
- public synchronized void deleteTemporaryDestination(SpyDestination dest)
- {
- Log.log("deleteDestination(dest="+dest.toString()+")");
-
- synchronized (messageQueue) {
- HashMap newMap=(HashMap)messageQueue.clone();
- newMap.remove(dest);
- messageQueue=newMap;
- }
- }
-
public void checkID(String ID) throws JMSException
{
securityManager.addClientID(ID);
}
-
- //Sent by a client to Ack or Nack a message.
- public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws JMSException
- {
- acknowledge(dc, item, null);
- }
-
-
+
//A connection has sent a new message
public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws
JMSException
{
addMessage( dc, val, null);
}
- public void connectionListening(SpyDistributedConnection dc,boolean
mode,Destination dest) throws JMSException
- {
- JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
- if (serverQueue==null) throw new JMSException("This destination does
not exist !");
-
- serverQueue.connectionListening(mode,dc);
- }
-
public org.spydermq.security.SecurityManager getSecurityManager() {
return securityManager;
}
-
- //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
- public SpyMessage queueReceive(SpyDistributedConnection dc,Queue queue, long
wait) throws JMSException
- {
- Log.log("JMSserver: queueReceive(queue="+queue+",wait="+wait+")");
- JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
- if (serverQueue==null) throw new JMSException("This destination does
not exist !");
-
- return serverQueue.queueReceive(wait,dc);
-
- }
-
- //A connection object wants to subscribe to a Destination
- public void subscribe(SpyDistributedConnection dc,Destination dest) throws
JMSException
- {
- Log.log("Server:
subscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
-
- JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
- if (queue==null) throw new JMSException("This destination does not
exist !");
- queue.addSubscriber(dc);
- }
-
-
-
- public void unsubscribe(SpyDistributedConnection dc,Destination dest) throws
JMSException
- {
- Log.log("Server:
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
-
- JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
- if (queue==null) throw new JMSException("This destination does not
exist !");
- queue.removeSubscriber(dc,null);
- }
-
- //Sent by a client to Ack or Nack a message.
- public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item, Long txId) throws JMSException
- {
- JMSServerQueue
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
- if (serverQueue==null) throw new JMSException("Destination does not
exist: "+item.jmsDestination);
- serverQueue.acknowledge(dc, item, txId);
- }
//A connection has sent a new message
public void addMessage(SpyDistributedConnection dc, SpyMessage val, Long txId)
throws JMSException
{
Log.notice("INCOMING: (TX="+txId+")"+dc.getClientID()+" =>
"+val.jmsDestination);
- JMSServerQueue
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
+ JMSDestination
queue=(JMSDestination)messageQueue.get(val.jmsDestination);
if (queue==null) throw new JMSException("This destination does not
exist !");
//Add the message to the queue
queue.addMessage(val, txId);
}
- public JMSServerQueue getServerQueue(SpyDestination d) throws JMSException
- {
- JMSServerQueue queue=(JMSServerQueue)messageQueue.get(d);
- if (queue==null) throw new JMSException("This destination does not
exist !");
- return queue;
- }
-
/**
* The following function performs a Unit Of Work.
*
@@ -469,5 +345,172 @@
}
+ }
+
+
+
+ //Sent by a client to Ack or Nack a message.
+ public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest
item) throws JMSException
+ {
+ acknowledge(dc, item, null);
+ }
+
+ //Sent by a client to Ack or Nack a message.
+ public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest
item, Long txId) throws JMSException
+ {
+
+ ClientConsumer queue = getClientConsumer(dc);
+ queue.acknowledge(item, txId);
+
+ }
+
+ //A connection is closing [error or notification]
+ public synchronized void connectionClosing(SpyDistributedConnection dc) throws
JMSException
+ {
+ Log.log("JMSServer->connectionClosing(dc="+dc+")");
+ if (dc==null) return;
+
+ // Close it's ClientConsumer
+ ClientConsumer cq = (ClientConsumer)clientConsumers.remove( dc );
+ if( cq != null ) {
+ cq.close();
+ }
+
+ //unregister its clientID
+ if (dc.getClientID()!=null)
+ securityManager.removeID(dc.getClientID());
+
+ //Remove any temporary destinations the consumer may have created.
+ synchronized (messageQueue) {
+
+ Iterator i=messageQueue.values().iterator();
+ while (i.hasNext()) {
+
+ JMSDestination sq=(JMSDestination)i.next();
+ if (dc.equals(sq.temporaryDestination)) {
+ i.remove();
+ }
+ }
+
+ }
+
+ }
+
+ public void connectionFailure(SpyDistributedConnection dc) throws JMSException
+ {
+ Log.log("JMSServer->connectionFailure(dc="+dc+")");
+
+ //We should try again :) This behavior should under control of a
Failure-Plugin
+ Log.error("I remove the Connection "+dc.getClientID()+" from the
subscribers list");
+
+ connectionClosing(dc);
+ }
+
+ // Gets the ClientConsumers mapped the the connection
+ // If the connection is not mapped, a new ClientConsumer is created
+ public ClientConsumer getClientConsumer(SpyDistributedConnection dc) throws
JMSException
+ {
+ ClientConsumer cq = (ClientConsumer)clientConsumers.get( dc );
+ if( cq == null ) {
+ cq = new ClientConsumer(this, dc);
+ clientConsumers.put( dc, cq );
+ }
+ return cq;
+ }
+
+ public JMSDestination getJMSDestination(SpyDestination dest)
+ {
+ return (JMSDestination)messageQueue.get(dest);
+ }
+
+ //A connection object wants to subscribe to a Destination
+ public void subscribe(SpyDistributedConnection dc, Subscription sub) throws
JMSException
+ {
+ Log.log("Server:
subscribe(dest="+sub.destination+",idConnection="+dc.getClientID()+")");
+
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
+
+ ClientConsumer.addSubscription(sub);
+
+ }
+
+ public void unsubscribe(SpyDistributedConnection dc, int subscriptionId)
throws JMSException
+ {
+ Log.log("Server: unsubscribe(idConnection="+dc.getClientID()+")");
+
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
+
+ ClientConsumer.removeSubscription(subscriptionId);
+
+ }
+
+ public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest,
String selector) throws JMSException {
+ //ClientConsumer.addSubscription(sub);
+ JMSDestination queue = (JMSDestination)messageQueue.get(dest);
+ if( queue == null )
+ throw new JMSException("That destination does not exist");
+
+ return queue.browse(selector);
+ }
+ public void listenerChange(SpyDistributedConnection dc, int subscriberId,
boolean state) throws JMSException {
+
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
+ ClientConsumer.listenerChange(subscriberId, state);
+
+ }
+ public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long
wait) throws JMSException {
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
+ return ClientConsumer.receive(subscriberId, wait);
+ }
+ public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws
JMSException {
+ ClientConsumer ClientConsumer = getClientConsumer(dc);
+ ClientConsumer.setEnabled(enabled);
+ }
+
+ public synchronized Queue createQueue(SpyDistributedConnection dc, String
name) throws JMSException
+ {
+ Log.log("createQueue("+name+")");
+
+ SpyQueue newQueue=new SpyQueue(name);
+ if (!messageQueue.containsKey(newQueue)) throw new JMSException("This
destination does not exist !");
+ return newQueue;
+ }
+
+ public synchronized Topic createTopic(SpyDistributedConnection dc, String
name) throws JMSException
+ {
+ Log.log("createTopic("+name+")");
+
+ SpyTopic newTopic=new SpyTopic(name);
+ if (!messageQueue.containsKey(newTopic)) throw new JMSException("This
destination does not exist !");
+ return newTopic;
+ }
+
+ public synchronized void deleteTemporaryDestination(SpyDistributedConnection
dc, SpyDestination dest)
+ {
+ Log.log("deleteDestination(dest="+dest.toString()+")");
+
+ synchronized (messageQueue) {
+ HashMap newMap=(HashMap)messageQueue.clone();
+ newMap.remove(dest);
+ messageQueue=newMap;
+ }
+
+ }
+
+ public void restoreMessage(SpyMessage message, String queueId) throws
JMSException
+ {
+ JMSDestination
queue=(JMSDestination)messageQueue.get(message.jmsDestination);
+ if (queue==null) throw new JMSException("This destination does not
exist !");
+ //Add the message to the queue
+ queue.restoreMessage(message, queueId);
+ }
+
+ public void saveConfig() throws java.io.IOException {
+
+ String file =
getClass().getClassLoader().getResource("spyderMQ.xml").getFile();
+ java.io.PrintStream stream = new java.io.PrintStream( new
java.io.FileOutputStream(file));
+ stream.print( serverConfig.toXML(true) );
+ stream.close();
+
}
}
1.3 +92 -72 spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PersistenceManager.java 2000/12/19 06:43:36 1.2
+++ PersistenceManager.java 2000/12/23 15:48:24 1.3
@@ -19,7 +19,6 @@
import org.spydermq.persistence.SpyMessageLog;
import org.spydermq.SpyDestination;
import org.spydermq.SpyMessage;
-
import org.spydermq.SpyDistributedConnection;
/**
@@ -27,7 +26,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class PersistenceManager {
@@ -43,7 +42,52 @@
HashMap messageLogs = new HashMap();
// Maps (Long)txIds to LinkedList of Runnable tasks
HashMap postCommitTasks = new HashMap();
+ // Maps Global transactions to local transactions
+ HashMap globalToLocal = new HashMap();
+ // Maps (Long)txIds to LinkedList of Runnable tasks
+ HashMap postRollbackTasks = new HashMap();
+ class GlobalXID implements Runnable {
+ SpyDistributedConnection dc;
+ Object xid;
+
+ GlobalXID(SpyDistributedConnection dc,Object xid) {
+ this.dc = dc;
+ this.xid = xid;
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (obj==null) return false;
+ if (obj.getClass()!=GlobalXID.class) return false;
+ return ((GlobalXID)obj).xid.equals( xid ) &&
+ ((GlobalXID)obj).dc.equals(dc);
+ }
+
+ public int hashCode() {
+ return xid.hashCode();
+ }
+
+ public void run() {
+ synchronized (globalToLocal) {
+ globalToLocal.remove(this);
+ }
+ }
+ }
+
+ static class LogInfo {
+ SpyMessageLog log;
+ SpyDestination destination;
+ String queueId;
+
+ LogInfo(SpyMessageLog log, SpyDestination destination, String queueId)
{
+ this.log=log;
+ this.destination=destination;
+ this.queueId=queueId;
+ }
+
+ }
+
/**
* PersistenceManager constructor.
*/
@@ -68,26 +112,29 @@
}
- public void add(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage
message, Long txId) throws javax.jms.JMSException {
+ public void add(String queueId, org.spydermq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
- SpyMessageLog log;
+ LogInfo logInfo;
synchronized (messageLogs) {
- log = (SpyMessageLog) messageLogs.get(dest);
+ logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
- if (log == null)
+ if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
- synchronized (log) {
- log.add(message, txId);
- }
+ logInfo.log.add(message, txId);
}
public void addPostCommitTask(Long txId, Runnable task) throws
javax.jms.JMSException {
+ if( txId == null ) {
+ task.run();
+ return;
+ }
+
LinkedList tasks;
synchronized (postCommitTasks) {
tasks = (LinkedList) postCommitTasks.get(txId);
@@ -135,38 +182,19 @@
}
- public void initQueue(org.spydermq.SpyDestination dest) throws
javax.jms.JMSException {
-
- try {
-
- URL logFile = new URL(dataDirectory, dest.getName() + "-queue"
+ ".dat");
- SpyMessageLog log = new SpyMessageLog(logFile.getFile());
-
- messageLogs.put(dest, log);
-
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
-
- }
-
- public void remove(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage
message, Long txId) throws javax.jms.JMSException {
+ public void remove(String queueId, org.spydermq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
- SpyMessageLog log;
+ LogInfo logInfo;
synchronized (messageLogs) {
- log = (SpyMessageLog) messageLogs.get(dest);
+ logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
- if (log == null)
+ if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
- log.remove(message, null);
+ logInfo.log.remove(message, null);
}
@@ -179,19 +207,19 @@
clone = (HashMap) messageLogs.clone();
}
- Iterator iter = clone.keySet().iterator();
+ Iterator iter = clone.values().iterator();
while (iter.hasNext()) {
- SpyDestination dest = (SpyDestination) iter.next();
+
+ LogInfo logInfo = (LogInfo)iter.next();
- JMSServerQueue q = server.getServerQueue(dest);
- SpyMessageLog log = (SpyMessageLog) clone.get(dest);
+ JMSDestination q =
server.getJMSDestination(logInfo.destination);
- SpyMessage rebuild[] = log.restore(commitedTXs);
+ SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
- //TODO: make sure this sync lock is good enough
+ //TODO: make sure this lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
- q.restoreMessage(rebuild[i]);
+ q.restoreMessage(rebuild[i], logInfo.queueId);
q.messageIdCounter =
Math.max(q.messageIdCounter, rebuild[i].messageId + 1);
}
}
@@ -220,42 +248,13 @@
}
}
-
- // Maps Global transactions to local transactions
- HashMap globalToLocal = new HashMap();
- // Maps (Long)txIds to LinkedList of Runnable tasks
- HashMap postRollbackTasks = new HashMap();
-
- class GlobalXID implements Runnable {
- SpyDistributedConnection dc;
- Object xid;
+
+ public void addPostRollbackTask(Long txId, Runnable task) throws
javax.jms.JMSException {
- GlobalXID(SpyDistributedConnection dc,Object xid) {
- this.dc = dc;
- this.xid = xid;
- }
-
- public boolean equals(Object obj)
- {
- if (obj==null) return false;
- if (obj.getClass()!=GlobalXID.class) return false;
- return ((GlobalXID)obj).xid.equals( xid ) &&
- ((GlobalXID)obj).dc.equals(dc);
+ if( txId == null ) {
+ return;
}
- public int hashCode() {
- return xid.hashCode();
- }
-
- public void run() {
- synchronized (globalToLocal) {
- globalToLocal.remove(this);
- }
- }
- }
-
- public void addPostRollbackTask(Long txId, Runnable task) throws
javax.jms.JMSException {
-
LinkedList tasks;
synchronized( postRollbackTasks ) {
tasks = (LinkedList)postRollbackTasks.get(txId);
@@ -294,5 +293,26 @@
throw new JMSException("Transaction does not exist from:
"+dc.getClientID()+" xid="+xid);
return txid;
+ }
+
+ public void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
+
+ try {
+
+ URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
+ SpyMessageLog log = new SpyMessageLog(logFile.getFile());
+
+ LogInfo info = new LogInfo(log, dest, queueId);
+
+ messageLogs.put(""+dest+"-"+queueId, info);
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
}
}
1.4 +1 -1 spyderMQ/src/java/org/spydermq/server/SpyderMQService.java
Index: SpyderMQService.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SpyderMQService.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyderMQService.java 2000/12/21 22:33:59 1.3
+++ SpyderMQService.java 2000/12/23 15:48:25 1.4
@@ -4,7 +4,6 @@
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
-
package org.spydermq.server;
import java.io.File;
@@ -27,8 +26,9 @@
*
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @author <a href="mailto:[EMAIL PROTECTED]">Juha Lindfors</a>
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyderMQService
extends ServiceMBeanSupport
1.7 +6 -3 spyderMQ/src/java/org/spydermq/server/StartServer.java
Index: StartServer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- StartServer.java 2000/12/21 22:33:59 1.6
+++ StartServer.java 2000/12/23 15:48:25 1.7
@@ -48,7 +48,7 @@
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class StartServer implements Runnable
{
@@ -160,11 +160,14 @@
//Get an InitialContext
InitialContext ctx=new InitialContext();
+ //Create the JMSServer object
+ theServer = new JMSServer();
+
+ theServer.serverConfig = serverCfg;
+
//Create a SecurityManager object
securityManager=new SecurityManager();
-
- //Create the JMSServer object
- theServer = new JMSServer(securityManager);
+ theServer.securityManager = securityManager;
PersistenceManager persistenceManager = new
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
theServer.persistenceManager = persistenceManager;
1.1 spyderMQ/src/java/org/spydermq/server/AbstractQueue.java
Index: AbstractQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
import org.spydermq.SpyMessage;
/**
* This defines the interface for the queues. This is implemented by
* SharedQueue and ExclusiveQueue
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface AbstractQueue {
public void addConsumer(ClientConsumer consumer) throws JMSException;
public void addMessage(SpyMessage mes, Long txId) throws JMSException;
void notifyMessageAvailable();
public void removeConsumer(ClientConsumer consumer) throws JMSException;
}
1.1 spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import org.spydermq.*;
import org.spydermq.persistence.SpyMessageLog;
import org.spydermq.xml.XElement;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
/**
* This represent the clients queue which consumes messages from the
* destinations on the provider.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClientConsumer implements Task {
//List of messages waiting to be transmitted to the client
private LinkedList messages = new LinkedList();
//The JMSServer object
JMSServer server;
//The connection this queue will send messages over
SpyDistributedConnection dc;
//Maps a destination to a LinkedList of Subscriptions
public HashMap destinationSubscriptions = new HashMap();
//Is this connection enabled (Can we transmit to the receiver)
boolean enabled;
//Maps a a subsction id to a Subscription
public HashMap subscriptions = new HashMap();
//LinkedList of the the temporary destinations that this client created
public LinkedList temporaryDestinations = new LinkedList();
//List of messages that should be acked or else returned to thier
//owning exclusive queues.
public HashMap unacknowledgedMessages = new HashMap();
// Constructor ---------------------------------------------------
public ClientConsumer(JMSServer server, SpyDistributedConnection dc) throws
JMSException
{
this.server=server;
this.dc = dc;
}
void acknowledge(AcknowledgementRequest item, Long txId) throws
javax.jms.JMSException {
Log.log(""+this+"->acknowledge(item="+item+",txId="+txId+")");
// This task gets run to place the neg ack a messge (place it back on
the queue)
class RestoreMessageTask implements Runnable {
SpyMessage message;
int subscriptionId;
RestoreMessageTask(SpyMessage m,int subscriptionId) { message
= m; this.subscriptionId=subscriptionId; }
public void run() {
Log.log("Restoring message: " + message.jmsMessageID);
String queueId = JMSDestination.DEFAULT_QUEUE_ID;
if( message.jmsDestination instanceof SpyTopic ) {
// Still need to implement
//queueId
}
try {
server.restoreMessage(message,queueId);
} catch (JMSException ignore ) {
}
}
}
SpyMessage m;
synchronized (unacknowledgedMessages) {
m = (SpyMessage)unacknowledgedMessages.remove(item);
}
if (m == null)
return;
// Was it a negative acknowledge??
if (!item.isAck) {
Runnable task = new RestoreMessageTask(m, item.subscriberId);
server.persistenceManager.addPostCommitTask(txId, task);
} else {
if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
if( m.getJMSDestination() instanceof SpyQueue )
server.persistenceManager.remove("queue",
m,txId);
else
server.persistenceManager.remove(dc.getClientID(), m,txId);
}
Runnable task = new RestoreMessageTask(m,item.subscriberId);
server.persistenceManager.addPostRollbackTask(txId, task);
Log.log("Message Ack: " + m.messageId);
}
}
public void addMessage(SpyMessage message) throws JMSException
{
Log.log(""+this+"->addMessage(message="+message+")");
LinkedList l = (LinkedList)destinationSubscriptions.get(
message.getJMSDestination() );
if( l == null )
throw new JMSException("No subscription found for that
destination.");
Iterator subs = l.iterator();
while( subs.hasNext() ) {
Subscription s = (Subscription)subs.next();
if( s.accepts( message, false ) ) {
synchronized (messages) {
ReceiveRequest r = new ReceiveRequest();
r.message = message;
messages.add(r);
}
return;
}
}
}
public void addSubscription(Subscription req) throws JMSException
{
Log.log(""+this+"->addSubscription(req="+req+")");
req.dc = dc;
synchronized (subscriptions ) {
subscriptions.put(new Integer(req.subscriptionId), req );
LinkedList ll = (LinkedList)destinationSubscriptions.get(
req.destination );
if( ll == null ) {
ll = new LinkedList();
destinationSubscriptions.put(req.destination, ll );
JMSDestination
queue=(JMSDestination)server.getJMSDestination(req.destination);
if (queue==null) throw new JMSException("This
destination does not exist !");
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
//
queue.addExclusiveConsumer(dc.getClientID(), this);
} else {
queue.addSharedConsumer(this);
}
} else {
queue.addExclusiveConsumer(queue.DEFAULT_QUEUE_ID, this);
}
}
ll.add( req );
}
}
//
public void close() {
Log.log(""+this+"->close()");
synchronized (subscriptions) {
Iterator i = subscriptions.keySet().iterator();
while( i.hasNext() ) {
Integer subscriptionId = (Integer)i.next();
try {
removeSubscription(subscriptionId.intValue());
} catch ( JMSException ignore ) {
}
}
}
synchronized (unacknowledgedMessages) {
Iterator i = unacknowledgedMessages.keySet().iterator();
while( i.hasNext() ) {
AcknowledgementRequest item =
(AcknowledgementRequest)i.next();
try {
acknowledge(item, null);
} catch ( JMSException ignore ) {
}
}
}
}
public org.spydermq.server.AbstractQueue getSubscribedQueue( Subscription req
) throws javax.jms.JMSException {
JMSDestination
queue=(JMSDestination)server.getJMSDestination(req.destination);
if (queue==null) throw new JMSException("This destination does not
exist !");
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
return null;
} else {
return queue.sharedQueue;
}
} else {
return queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID);
}
}
public void listenerChange(int subscriberId, boolean state) throws
JMSException {
Log.log(""+this+"->listenerChange(subscriberId="+subscriberId+",
state="+state+")");
Subscription req = (Subscription)subscriptions.get(new
Integer(subscriberId));
if( req == null )
throw new JMSException("The provided subscription does not
exist");
req.listening = state;
if( req.listening ) {
// Notify the queue. It could be waiting for a consumer
getSubscribedQueue(req).notifyMessageAvailable();
}
}
public void notifyMessageAvailable() {
Log.log(""+this+"->notifyMessageAvailable()");
synchronized (messages) {
if( messages.size() == 0 )
return;
}
synchronized (server.taskQueue) {
server.taskQueue.addLast(this);
server.taskQueue.notify();
}
}
public SpyMessage receive(int subscriberId, long wait) throws JMSException {
Log.log(""+this+"->receive(subscriberId="+subscriberId+",
wait="+wait+")");
Subscription req = (Subscription)subscriptions.get(new
Integer(subscriberId));
if( req == null )
throw new JMSException("The provided subscription does not
exist");
JMSDestination queue = server.getJMSDestination(req.destination);
if( queue == null )
throw new JMSException("The subscription's destination does
not exist");
// Is it a receiveNoWait()
if( wait == -1 ) {
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
// Not Implemented yet
//return
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
}
} else {
return
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
}
}
// Notify the queue. It could be waiting for a consumer
req.receiving = true;
getSubscribedQueue(req).notifyMessageAvailable();
return null;
}
public void removeSubscription(int subscriptionId) throws JMSException
{
Log.log(""+this+"->removeSubscription(subscriberId="+subscriptionId+")");
Subscription req;
synchronized (subscriptions ) {
req = (Subscription)subscriptions.remove(new
Integer(subscriptionId));
if( req == null )
throw new JMSException("The subscription had not been
previously registered");
LinkedList ll = (LinkedList)destinationSubscriptions.get(
req.destination );
if( ll == null )
throw new JMSException("The subscription was not
registered with the destination");
ll.remove( req );
if( ll.size() != 0 )
return;
// There is no subscriber for the destination at this point
destinationSubscriptions.remove(req.destination);
JMSDestination
queue=(JMSDestination)server.getJMSDestination(req.destination);
if (queue==null)
throw new JMSException("The subscription was registed
with a destination that does not exist !");
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
//
queue.addExclusiveConsumer(dc.getClientID(), this);
} else {
queue.removeSharedConsumer(this);
}
} else {
queue.removeExclusiveConsumer(queue.DEFAULT_QUEUE_ID,
this);
}
}
}
// Iterate over the consumers asking them to take messages until they stop
// consuming.
synchronized public void run() throws JMSException
{
Log.log(""+this+"->run()");
ReceiveRequest[] job;
synchronized (messages) {
if( messages.size() == 0 )
return;
job=new ReceiveRequest[messages.size()];
job=(ReceiveRequest[])messages.toArray(job);
messages.clear();
}
try {
dc.cr.receive(job);
} catch ( Exception e ) {
server.connectionFailure(dc);
}
}
// Get the first message off the queue that I can. return false if none taken.
public boolean scanExclusiveQueue( ExclusiveQueue queue ) throws JMSException {
Log.log(""+this+"->scanExclusiveQueue(queue="+queue+")");
Iterator i = queue.messages.iterator();
while( i.hasNext() ) {
SpyMessage message = (SpyMessage)i.next();
synchronized (subscriptions) {
LinkedList l =
(LinkedList)destinationSubscriptions.get( message.getJMSDestination() );
if( l == null )
throw new JMSException("No subscription found
for that destination.");
Iterator subs = l.iterator();
while( subs.hasNext() ) {
Subscription s = (Subscription)subs.next();
if( s.accepts( message, true ) ) {
s.receiving = false;
i.remove();
synchronized (messages) {
ReceiveRequest r = new
ReceiveRequest();
r.message = message;
r.subscriptionId = new
Integer(s.subscriptionId);
messages.add(r);
AcknowledgementRequest ack =
new AcknowledgementRequest();
ack.destination =
message.getJMSDestination();
ack.messageID =
message.getJMSMessageID();
ack.subscriberId =
s.subscriptionId;
ack.isAck = false;
unacknowledgedMessages.put(ack, message);
}
notifyMessageAvailable();
return true;
}
}
}
}
return false;
}
public void setEnabled(boolean enabled) {
Log.log(""+this+"->setEnabled(enabled="+enabled+")");
this.enabled = enabled;
}
public String toString() {
return "ClientConsumer:"+dc.getClientID();
}
}
1.1 spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java
Index: ExclusiveQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import org.spydermq.*;
import org.spydermq.persistence.SpyMessageLog;
import org.spydermq.selectors.Selector;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
/**
* This class represents a queue which provides it's messages
* exclusivly to one consumer at a time.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ExclusiveQueue implements Task, AbstractQueue {
//List of messages waiting to be dispatched
TreeSet messages = new TreeSet();
//The JMSServer object
JMSServer server;
//DistributedConnection objs that have "registered" to this Destination
private LinkedList consumers = new LinkedList();
//The queueId needed to identify this queue with the persistence manager.
String queueId;
//Used to put a message that was added previously to the queue, back in the
queue
public void restoreMessage(SpyMessage mes)
{
//restore a message to the message list...
synchronized (messages) {
messages.add(mes);
}
notifyMessageAvailable();
}
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
// This task gets run to make the message visible in the queue.
class AddMessagePostCommitTask implements Runnable {
SpyMessage message;
AddMessagePostCommitTask(SpyMessage m) {
message = m;
}
public void run() {
//restore a message to the message list...
synchronized (messages) {
messages.add(message);
}
notifyMessageAvailable();
}
}
// Persist the message if it was persistent
if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
server.persistenceManager.add(queueId, mes, txId);
// The message gets added to the queue after the transaction
// commits (if the message was transacted)
Runnable task = new AddMessagePostCommitTask(mes);
if( txId == null ) {
task.run();
} else {
server.persistenceManager.addPostCommitTask(txId, task);
}
}
// Constructor ---------------------------------------------------
public ExclusiveQueue(JMSServer server, String queueId) throws JMSException
{
this.server=server;
this.queueId = queueId;
}
// synchrnozed so no message dispatching occurs while we add a consumer
synchronized public void addConsumer(ClientConsumer consumer) throws
JMSException
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (consumers) {
consumers.add(consumer);
}
}
public SpyMessage[] browse(String selector) throws JMSException {
if( selector == null ) {
SpyMessage list[];
synchronized (messages) {
list = new SpyMessage[messages.size()];
list = (SpyMessage [])messages.toArray(list);
}
return list;
} else {
Selector s = new Selector( selector );
LinkedList selection=new LinkedList();
synchronized (messages) {
Iterator i = messages.iterator();
while( i.hasNext() ) {
SpyMessage m = (SpyMessage)i.next();
if( s.test(m) )
selection.add(m);
}
}
SpyMessage list[];
list = new SpyMessage[selection.size()];
list = (SpyMessage [])selection.toArray(list);
return list;
}
}
public void notifyMessageAvailable() {
Log.log(""+this+"->notifyMessageAvailable()");
synchronized (server.taskQueue) {
server.taskQueue.addLast(this);
server.taskQueue.notify();
}
}
//Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
public SpyMessage receiveMessage() throws JMSException
{
synchronized (messages) {
if (messages.size()==0)
return null;
SpyMessage m = (SpyMessage)messages.first();
messages.remove(m);
return m;
}
}
public void removeConsumer(ClientConsumer consumer) throws JMSException
{
synchronized (consumers) {
consumers.remove(consumer);
}
}
// Iterate over the consumers asking them to take messages until they stop
// consuming.
public void run() throws JMSException
{
Log.log(""+this+"->run()");
synchronized (messages) {
synchronized (consumers) {
LinkedList consumersDone = new LinkedList();
while( consumers.size()!=0 && messages.size() != 0) {
ClientConsumer consumer =
(ClientConsumer)consumers.removeFirst();
// Tell the consumer to scan the message queue
if( consumer.scanExclusiveQueue(this) ) {
// The consumer consumed a message.
// So place him at the back of the
consumer list
consumers.addLast(consumer);
} else {
// The consumer did not find a message
to consume,
// Place him at the back of the done
list
consumersDone.addLast(consumer);
}
}
// Add all the consumers that were done, back into the
consumer
// list.
while( consumersDone.size() != 0 ) {
consumers.addLast(consumersDone.removeFirst());
}
}
}
}
public String toString() {
return "ExclusiveQueue:"+queueId;
}
}
1.1 spyderMQ/src/java/org/spydermq/server/JMSDestination.java
Index: JMSDestination.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import org.spydermq.*;
import org.spydermq.persistence.SpyMessageLog;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
/**
* This class is a message queue which is stored (hashed by Destination) on the
* JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class JMSDestination {
public static final String DEFAULT_QUEUE_ID = "queue";
//the Destination of this queue
SpyDestination destination;
//If this is a temporaryDestination, temporaryDestination=ClientConsumer of
the owner, otherwise it's null
ClientConsumer temporaryDestination;
//The JMSServer object
JMSServer server;
//Am I a queue or a topic
boolean isTopic;
//Counter used to number incomming messages. (Used to order the messages.)
long messageIdCounter = Long.MIN_VALUE;
//Hashmap of ExclusiveQueues
HashMap exclusiveQueues = new HashMap();
//ShareQueue used for topics
SharedQueue sharedQueue;
// Constructor ---------------------------------------------------
JMSDestination(SpyDestination dest,ClientConsumer temporary,JMSServer server)
throws JMSException
{
destination=dest;
temporaryDestination=temporary;
this.server=server;
isTopic=dest instanceof SpyTopic;
sharedQueue = new SharedQueue(server);
// If this is not a temp destination, then we should persist data
if( temporaryDestination == null ) {
if( isTopic ) {
// Not Implemented yet
// TODO: init durable topic subscriber exclusive
queues here
} else {
exclusiveQueues.put(DEFAULT_QUEUE_ID, new
ExclusiveQueue(server, DEFAULT_QUEUE_ID));
server.persistenceManager.initQueue(dest,
DEFAULT_QUEUE_ID);
}
}
}
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
//Number the message so that we can preserve order of delivery.
mes.messageId = messageIdCounter++;
if( isTopic ) {
sharedQueue.addMessage(mes, txId);
synchronized (exclusiveQueues) {
if( exclusiveQueues.size() == 0 )
return;
Iterator iter = exclusiveQueues.values().iterator();
while( iter.hasNext() ) {
ExclusiveQueue eq =
(ExclusiveQueue)iter.next();
eq.addMessage(mes, txId);
}
}
} else {
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get(
DEFAULT_QUEUE_ID );
eq.addMessage(mes, txId);
}
}
// Package protected ---------------------------------------------
void addExclusiveConsumer(String queue, ClientConsumer c) throws JMSException {
Log.log(""+this+"->addExclusiveConsumer(queue="+queue+",
consumer="+c+")");
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
if( eq == null )
throw new JMSException("That destination queue does not
exist");
eq.addConsumer(c);
}
// Package protected ---------------------------------------------
void addSharedConsumer(ClientConsumer c) throws JMSException {
Log.log(""+this+"->addSharedConsumer(consumer="+c+")");
sharedQueue.addConsumer(c);
}
public SpyMessage[] browse(String selector) throws JMSException {
Log.log(""+this+"->browse(selector="+selector+")");
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get(
DEFAULT_QUEUE_ID );
return eq.browse( selector );
}
// Package protected ---------------------------------------------
ExclusiveQueue getExclusiveQueue(String queue) throws JMSException {
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
if( eq == null )
throw new JMSException("That destination queue does not
exist");
return eq;
}
// Package protected ---------------------------------------------
void removeConsumerFromAll(ClientConsumer c) throws JMSException {
Log.log(""+this+"->removeConsumerFromAll(consumer="+c+")");
sharedQueue.removeConsumer(c);
Iterator i = exclusiveQueues.values().iterator();
while ( i.hasNext() ) {
ExclusiveQueue eq = (ExclusiveQueue)i.next();
eq.removeConsumer(c);
}
}
// Package protected ---------------------------------------------
void removeExclusiveConsumer(String queue, ClientConsumer c) throws
JMSException {
Log.log(""+this+"->removeExclusiveConsumer(queue="+queue+",
consumer="+c+")");
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
if( eq == null )
throw new JMSException("That destination queue does not
exist");
eq.removeConsumer(c);
}
// Package protected ---------------------------------------------
void removeSharedConsumer(ClientConsumer c) throws JMSException {
Log.log(""+this+"->removeSharedConsumer(consumer="+c+")");
sharedQueue.removeConsumer(c);
}
//Used to put a message that was added previously to the queue, back in the
queue
public void restoreMessage(SpyMessage mes, String queueId) throws JMSException
{
Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
ExclusiveQueue eq = getExclusiveQueue(queueId);
eq.restoreMessage(mes);
}
public String toString() {
return "JMSDestination:"+destination;
}
}
1.1 spyderMQ/src/java/org/spydermq/server/SharedQueue.java
Index: SharedQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import org.spydermq.*;
import org.spydermq.persistence.SpyMessageLog;
import java.util.Iterator;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
/**
* This class is a message queue which allows sending a single message
* to multiple consumers.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SharedQueue implements Task, AbstractQueue {
//List of Pending messages
private TreeSet messages;
//The JMSServer object
JMSServer server;
//DistributedConnection objs that have "registered" to this Destination
private LinkedList consumers;
// Constructor ---------------------------------------------------
SharedQueue(JMSServer server) throws JMSException
{
this.server=server;
consumers=new LinkedList();
messages=new TreeSet();
}
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
// This task gets run to make the message visible in the queue.
class AddMessagePostCommitTask implements Runnable {
SpyMessage message;
AddMessagePostCommitTask(SpyMessage m) {
message = m;
}
public void run() {
synchronized (messages)
{
//Add the message to the queue
messages.add(message);
notifyMessageAvailable();
}
}
}
// The message gets added to the queue after the transaction
// commits (if the message was transacted)
Runnable task = new AddMessagePostCommitTask(mes);
if( txId == null ) {
task.run();
} else {
server.persistenceManager.addPostCommitTask(txId, task);
}
}
// Package protected ---------------------------------------------
public void addConsumer(ClientConsumer consumer) throws JMSException
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (consumers) {
consumers.add(consumer);
}
}
public void notifyMessageAvailable() {
synchronized (server.taskQueue) {
server.taskQueue.addLast(this);
server.taskQueue.notify();
}
}
public void removeConsumer(ClientConsumer consumer) throws JMSException
{
synchronized (consumers) {
consumers.remove(consumer);
}
}
// This will dispatch messages in the queue the the ClientConsumers
synchronized public void run() throws JMSException
{
SpyMessage[] job;
synchronized (messages) {
if( messages.size() == 0 )
return;
job=new SpyMessage[messages.size()];
job=(SpyMessage[])messages.toArray(job);
messages.clear();
}
Iterator iter = consumers.iterator();
while( iter.hasNext() ) {
ClientConsumer consumer = (ClientConsumer)iter.next();
for( int i=0 ; i < job.length; i++ )
consumer.addMessage(job[i]);
consumer.notifyMessageAvailable();
}
}
}
1.1 spyderMQ/src/java/org/spydermq/server/Task.java
Index: Task.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
/**
* A interface similar to the Runnable interface except this
* one allows run() to throw a JMSException.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface Task {
public void run() throws JMSException;
}