User: hiram
Date: 00/12/27 09:02:23
Modified: src/java/org/spydermq/server JMSServer.java
ClientConsumer.java JMSDestination.java
StartServer.java PersistenceManager.java
Added: src/java/org/spydermq/server UserManager.java
Log:
Feature Add: Durable Topic Subscriptions now work!
More work still has to be done with user managment (who
is allowed to create durable subscriptions). The DurableSubscriptionExample
class now works.
Revision Changes Path
1.8 +3 -2 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- JMSServer.java 2000/12/26 04:16:00 1.7
+++ JMSServer.java 2000/12/27 17:02:21 1.8
@@ -19,15 +19,15 @@
import org.spydermq.*;
import org.spydermq.xml.XElement;
-import org.spydermq.security.UserManager;
+
/**
* This class implements the JMS provider
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
@@ -443,13 +443,14 @@
}
public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest,
String selector) throws JMSException {
- //ClientConsumer.addSubscription(sub);
JMSDestination queue = (JMSDestination)messageQueue.get(dest);
if( queue == null )
throw new JMSException("That destination does not exist");
return queue.browse(selector);
}
+
+
public void listenerChange(SpyDistributedConnection dc, int subscriberId,
boolean state) throws JMSException {
ClientConsumer ClientConsumer = getClientConsumer(dc);
1.3 +13 -6 spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ClientConsumer.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ClientConsumer.java 2000/12/24 01:55:07 1.2
+++ ClientConsumer.java 2000/12/27 17:02:21 1.3
@@ -27,7 +27,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class ClientConsumer implements Task {
@@ -69,8 +69,8 @@
Log.log("Restoring message: " + message.jmsMessageID);
String queueId;
if( message.jmsDestination instanceof SpyTopic ) {
- // Still need to implement
- queueId = null;
+ Subscription req =
(Subscription)subscriptions.get(new Integer(subscriptionId));
+ queueId =
JMSDestination.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
} else {
queueId = JMSDestination.DEFAULT_QUEUE_ID;
}
@@ -164,7 +164,12 @@
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
+
//
queue.addExclusiveConsumer(dc.getClientID(), this);
+
server.userManager.setDurableSubscription(dc.getClientID(),req.durableSubscriptionName,(SpyTopic)req.destination);
+ String queueId =
queue.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
+ queue.addExclusiveConsumer(queueId,
this);
+
} else {
queue.addSharedConsumer(this);
}
@@ -220,7 +225,8 @@
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
- return null;
+ String queueId =
JMSDestination.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
+ return queue.getExclusiveQueue(queueId );
} else {
return queue.sharedQueue;
}
@@ -279,8 +285,8 @@
if( wait == -1 ) {
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
- // Not Implemented yet
- //return
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
+ String queueId =
JMSDestination.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
+ return
queue.getExclusiveQueue(queueId).receiveMessage();
}
} else {
return
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
@@ -332,7 +338,8 @@
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
- //
queue.addExclusiveConsumer(dc.getClientID(), this);
+ String queueId =
queue.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
+ queue.removeExclusiveConsumer(queueId, this);
} else {
queue.removeSharedConsumer(this);
}
1.3 +47 -8 spyderMQ/src/java/org/spydermq/server/JMSDestination.java
Index: JMSDestination.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSDestination.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JMSDestination.java 2000/12/24 01:55:07 1.2
+++ JMSDestination.java 2000/12/27 17:02:21 1.3
@@ -26,7 +26,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class JMSDestination {
@@ -108,7 +108,7 @@
Log.log(""+this+"->addExclusiveConsumer(queue="+queue+",
consumer="+c+")");
- ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
+ ExclusiveQueue eq = getExclusiveQueue( queue );
if( eq == null )
throw new JMSException("That destination queue does not
exist");
@@ -123,14 +123,16 @@
public SpyMessage[] browse(String selector) throws JMSException {
Log.log(""+this+"->browse(selector="+selector+")");
- ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get(
DEFAULT_QUEUE_ID );
+ ExclusiveQueue eq = getExclusiveQueue( DEFAULT_QUEUE_ID );
return eq.browse( selector );
}
// Package protected ---------------------------------------------
ExclusiveQueue getExclusiveQueue(String queue) {
- return (ExclusiveQueue)exclusiveQueues.get( queue );
+ synchronized (exclusiveQueues) {
+ return (ExclusiveQueue)exclusiveQueues.get( queue );
+ }
}
@@ -139,10 +141,13 @@
Log.log(""+this+"->removeConsumerFromAll(consumer="+c+")");
sharedQueue.removeConsumer(c);
- Iterator i = exclusiveQueues.values().iterator();
- while ( i.hasNext() ) {
- ExclusiveQueue eq = (ExclusiveQueue)i.next();
- eq.removeConsumer(c);
+
+ synchronized (exclusiveQueues) {
+ Iterator i = exclusiveQueues.values().iterator();
+ while ( i.hasNext() ) {
+ ExclusiveQueue eq = (ExclusiveQueue)i.next();
+ eq.removeConsumer(c);
+ }
}
}
@@ -152,7 +157,7 @@
Log.log(""+this+"->removeExclusiveConsumer(queue="+queue+",
consumer="+c+")");
- ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
+ ExclusiveQueue eq = getExclusiveQueue( queue );
if( eq == null )
throw new JMSException("That destination queue does not
exist");
@@ -176,4 +181,38 @@
public String toString() {
return "JMSDestination:"+destination;
}
+
+ public void createDurableSubscription(String clientId, String
subscriptionName) throws JMSException
+ {
+ if( !isTopic )
+ throw new JMSException("Not a valid operation on a Queue");
+
+ String queueId =
durableSubscriptionToQueueId(clientId,subscriptionName);
+
+ synchronized (exclusiveQueues) {
+ exclusiveQueues.put(queueId, new ExclusiveQueue(server,
queueId));
+ }
+
+ server.persistenceManager.initQueue(destination, queueId);
+
+ }
+
+ public void destoryDurableSubscription(String clientId, String
subscriptionName) throws JMSException
+ {
+ if( !isTopic )
+ throw new JMSException("Not a valid operation on a Queue");
+
+ String queueId =
durableSubscriptionToQueueId(clientId,subscriptionName);
+ synchronized (exclusiveQueues) {
+ exclusiveQueues.remove(queueId);
+ }
+ server.persistenceManager.destroyQueue(destination, queueId);
+
+ }
+
+ static public String durableSubscriptionToQueueId(String clientId, String
subscriptionName)
+ {
+ return clientId+"-"+subscriptionName;
+ }
+
}
1.10 +16 -25 spyderMQ/src/java/org/spydermq/server/StartServer.java
Index: StartServer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- StartServer.java 2000/12/26 04:16:00 1.9
+++ StartServer.java 2000/12/27 17:02:21 1.10
@@ -27,6 +27,7 @@
import java.util.Set;
import java.util.LinkedList;
import java.util.Iterator;
+import java.util.Enumeration;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
@@ -36,8 +37,8 @@
import org.spydermq.SpyTopicConnectionFactory;
import org.spydermq.xml.XElement;
import org.spydermq.persistence.SpyTxLog;
-import org.spydermq.security.UserManager;
+
/**
* Class used to start a JMS service. This can be called from inside another
@@ -48,7 +49,7 @@
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class StartServer implements Runnable
{
@@ -163,9 +164,10 @@
theServer.serverConfig = serverCfg;
//Create a UserManager object
- UserManager userManager=new UserManager();
+ UserManager userManager=new UserManager(theServer,
serverCfg.getElement("UserManager"));
theServer.userManager = userManager;
+ //Creatye a PersistenceManager object
PersistenceManager persistenceManager = new
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
theServer.persistenceManager = persistenceManager;
@@ -174,9 +176,9 @@
//create the known topics
Context subcontext=ctx.createSubcontext("topic");
- Iterator iter = serverCfg.getElementsNamed("Topic");
- while( iter.hasNext() ) {
- XElement element = (XElement)iter.next();
+ Enumeration enum = serverCfg.getElementsNamed("Topic");
+ while( enum.hasMoreElements() ) {
+ XElement element = (XElement)enum.nextElement();
String name = element.getField("Name");
Topic t=theServer.newTopic(name);
@@ -187,36 +189,25 @@
//create the known queues
subcontext=ctx.createSubcontext("queue");
- iter = serverCfg.getElementsNamed("Queue");
- while( iter.hasNext() ) {
- XElement element = (XElement)iter.next();
+ enum = serverCfg.getElementsNamed("Queue");
+ while( enum.hasMoreElements() ) {
+ XElement element = (XElement)enum.nextElement();
String name = element.getField("Name");
Queue q=theServer.newQueue(name);
subcontext.rebind(name,q);
}
-
- //Set the known Ids
- iter = serverCfg.getElementsNamed("User");
- while( iter.hasNext() ) {
- XElement element = (XElement)iter.next();
- String name = element.getField("Name");
- String passwd = element.getField("Password");
- if( element.containsField("Id") ) {
-
userManager.addUser(name,passwd,element.getField("Id"));
- } else {
- userManager.addUser(name,passwd,null);
- }
- }
+ // Resubscribe the durable subscriptions
+ userManager.initDurableSubscriptions();
- // Restore the persistent messages to thie queues.
+ // Restore the persistent messages to thier queues.
theServer.persistenceManager.restore();
- iter = serverCfg.getElementsNamed("InvocationLayer");
- while( iter.hasNext() ) {
+ enum = serverCfg.getElementsNamed("InvocationLayer");
+ while( enum.hasMoreElements()) {
- XElement element = (XElement)iter.next();
+ XElement element = (XElement)enum.nextElement();
String name = element.getField("Name");
String topicConnectionFactoryJNDI =
element.getField("TopicConnectionFactoryJNDI");
String queueConnectionFactoryJNDI =
element.getField("QueueConnectionFactoryJNDI");
1.5 +24 -0 spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- PersistenceManager.java 2000/12/24 01:55:06 1.4
+++ PersistenceManager.java 2000/12/27 17:02:22 1.5
@@ -26,7 +26,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class PersistenceManager {
@@ -305,6 +305,30 @@
LogInfo info = new LogInfo(log, dest, queueId);
messageLogs.put(""+dest+"-"+queueId, info);
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ }
+
+ public void destroyQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
+
+ try {
+
+ URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
+ java.io.File file = new java.io.File(logFile.getFile());
+
+ SpyMessageLog log =
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
+ if( log == null )
+ throw new JMSException("The persistence log was never
initialized");
+ log.close();
+
+ file.delete();
} catch (javax.jms.JMSException e) {
throw e;
1.1 spyderMQ/src/java/org/spydermq/server/UserManager.java
Index: UserManager.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import java.util.Hashtable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Collection;
import java.util.Enumeration;
import javax.jms.JMSException;
import org.spydermq.xml.XElement;
import org.spydermq.Log;
import org.spydermq.SpyTopic;
import org.spydermq.server.JMSServer;
/**
* This class is a simple User Manager. It handles credential issues.
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class UserManager
{
//Known users hashed by login
private Hashtable users;
//registered clientID
private HashSet clientID;
JMSServer server;
org.spydermq.xml.XElement userManagerConfig;
public void addUser(String login,String passwd,String clientID)
{
users.put(login,new Identity(login,passwd,clientID));
}
public String checkUser(String login,String passwd) throws JMSException
{
Identity user=(Identity)users.get(login);
if (user==null) throw new JMSException("This user does not exist");
if (!passwd.equals(user.passwd)) throw new JMSException("Bad
password");
if (user.clientID!=null) {
if (clientID.contains(user.clientID)) throw new
JMSException("This clientID is already registered !");
clientID.add(user.clientID);
}
return user.clientID;
}
public void check(String login,String passwd,String clientID)
{
synchronized (users) {
users.put(login,new Identity(login,passwd,clientID));
}
}
public void addClientID(String ID) throws JMSException
{
//Check : this ID must not be registered
if (clientID.contains(ID)) throw new JMSException("This clientID is
already registered !");
//Check : this ID must not be password protected
synchronized (users) {
Iterator i=users.values().iterator();
if (i!=null) {
while (i.hasNext()) {
Identity id=(Identity)i.next();
if (id.clientID!=null)
if (id.clientID.equals(ID))
throw new JMSException("This
clientID is password protected !");
}
}
}
clientID.add(ID);
}
public void removeID(String ID)
{
clientID.remove(ID);
}
public class Identity {
String login;
String passwd;
String clientID;
Identity(String login,String passwd,String clientID)
{
this.login=login;
this.passwd=passwd;
this.clientID=clientID;
}
}
public UserManager(JMSServer server, XElement userManagerConfig) throws
org.spydermq.xml.XElementException
{
this.server=server;
this.userManagerConfig=userManagerConfig;
users=new Hashtable();
clientID=new HashSet();
//Set the known Ids
Enumeration enum = userManagerConfig.getElementsNamed("User");
while( enum.hasMoreElements() ) {
XElement element = (XElement)enum.nextElement();
String name = element.getField("Name");
String passwd = element.getField("Password");
if( element.containsField("Id") ) {
addUser(name,passwd,element.getField("Id"));
} else {
addUser(name,passwd,null);
}
}
}
public void initDurableSubscriptions() throws
org.spydermq.xml.XElementException {
//Set the known Ids
Enumeration enum =
userManagerConfig.getElementsNamed("User/DurableSubscription");
while( enum.hasMoreElements() ) {
XElement element = (XElement)enum.nextElement();
String clientId = element.getField("../Id");
String name = element.getField("Name");
String topicName = element.getField("TopicName");
try {
Log.log("Restarting Durable Subscription:
"+clientId+","+name+","+topicName);
SpyTopic topic=new SpyTopic(topicName);
JMSDestination dest = server.getJMSDestination(topic);
dest.createDurableSubscription(clientId, name);
} catch (JMSException e ) {
Log.error("Could not initialize a durable subscription
for : Client Id="+clientId+", Name="+name+", Topic Name="+topicName);
Log.error(e);
}
}
}
public void setDurableSubscription(String clientId, String name, SpyTopic
topic) throws JMSException {
try {
//Set the known Ids
Enumeration enum = userManagerConfig.getElementsNamed("User");
while( enum.hasMoreElements() ) {
// Match the User.Name
XElement user = (XElement)enum.nextElement();
if( !user.containsField("Id") ||
!user.getField("Id").equals(clientId) )
continue;
XElement subscription=null;
// Match the User/DurableSubscription.Name
Enumeration enum2 =
user.getElementsNamed("DurableSubscription");
while( enum2.hasMoreElements() ) {
XElement t = (XElement)enum2.nextElement();
if( t.getField("Name").equals(name)) {
subscription = t;
break;
}
}
if( subscription == null ) {
// it was not previously registered...
if( topic == null )
return;
subscription = new
XElement("DurableSubscription");
subscription.addField("Name", name);
subscription.addField("TopicName",
topic.getName());
user.addElement(subscription);
JMSDestination dest =
server.getJMSDestination(topic);
dest.createDurableSubscription(clientId, name);
server.saveConfig();
} else {
// it was previously registered...
if(
subscription.getField("TopicName").equals(topic.getName()) ) {
// and it is the same as before, do
nothing.
return;
} else {
// we have to change the
subscription...
SpyTopic prevTopic = new SpyTopic(
subscription.getField("TopicName") );
JMSDestination dest =
server.getJMSDestination(prevTopic);
dest.destoryDurableSubscription(clientId, name);
if( topic == null ) {
subscription.removeFromParent();
} else {
subscription.setField("TopicName", topic.getName());
dest =
server.getJMSDestination(topic);
dest.createDurableSubscription(clientId, name);
}
server.saveConfig();
}
}
return;
}
} catch ( java.io.IOException e ) {
JMSException newE = new JMSException("Could not setup the
durable subscription");
newE.setLinkedException(e);
throw newE;
} catch ( org.spydermq.xml.XElementException e ) {
JMSException newE = new JMSException("Could not setup the
durable subscription");
newE.setLinkedException(e);
throw newE;
}
}
}