User: hiram   
  Date: 00/12/27 09:02:23

  Modified:    src/java/org/spydermq/server JMSServer.java
                        ClientConsumer.java JMSDestination.java
                        StartServer.java PersistenceManager.java
  Added:       src/java/org/spydermq/server UserManager.java
  Log:
  Feature Add: Durable Topic Subscriptions now work!
  More work still has to be done with user managment (who
  is allowed to create durable subscriptions).  The DurableSubscriptionExample
  class now works.
  
  Revision  Changes    Path
  1.8       +3 -2      spyderMQ/src/java/org/spydermq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- JMSServer.java    2000/12/26 04:16:00     1.7
  +++ JMSServer.java    2000/12/27 17:02:21     1.8
  @@ -19,15 +19,15 @@
   
   import org.spydermq.*;
   import org.spydermq.xml.XElement;
  -import org.spydermq.security.UserManager;
   
  +
   /**
    *   This class implements the JMS provider
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
  @@ -443,13 +443,14 @@
        }
   
        public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest, 
String selector) throws JMSException {
  -             //ClientConsumer.addSubscription(sub);
                JMSDestination queue = (JMSDestination)messageQueue.get(dest);
                if( queue == null )
                        throw new JMSException("That destination does not exist");
                        
                return queue.browse(selector);
        }
  +
  +     
        public void listenerChange(SpyDistributedConnection dc, int subscriberId, 
boolean state) throws JMSException {
   
                ClientConsumer ClientConsumer = getClientConsumer(dc);
  
  
  
  1.3       +13 -6     spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ClientConsumer.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ClientConsumer.java       2000/12/24 01:55:07     1.2
  +++ ClientConsumer.java       2000/12/27 17:02:21     1.3
  @@ -27,7 +27,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class ClientConsumer implements Task {
   
  @@ -69,8 +69,8 @@
                                Log.log("Restoring message: " + message.jmsMessageID);
                                String queueId;
                                if( message.jmsDestination instanceof SpyTopic ) {
  -                                     // Still need to implement
  -                                     queueId = null;
  +                                     Subscription req = 
(Subscription)subscriptions.get(new Integer(subscriptionId));
  +                                     queueId = 
JMSDestination.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
                                } else {
                                        queueId = JMSDestination.DEFAULT_QUEUE_ID;
                                }
  @@ -164,7 +164,12 @@
                                                                
                                if( queue.isTopic ) {
                                        if( req.durableSubscriptionName!=null ) {
  +                                             
                                                // 
queue.addExclusiveConsumer(dc.getClientID(), this);
  +                                             
server.userManager.setDurableSubscription(dc.getClientID(),req.durableSubscriptionName,(SpyTopic)req.destination);
  +                                             String queueId = 
queue.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
  +                                             queue.addExclusiveConsumer(queueId, 
this);
  +                                             
                                        } else {
                                                queue.addSharedConsumer(this);         
                 
                                        }
  @@ -220,7 +225,8 @@
   
                if( queue.isTopic ) {
                        if( req.durableSubscriptionName!=null ) {
  -                             return null;
  +                             String queueId = 
JMSDestination.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
  +                             return queue.getExclusiveQueue(queueId );
                        } else {
                                return queue.sharedQueue;
                        }
  @@ -279,8 +285,8 @@
                if( wait == -1 ) {
                        if( queue.isTopic ) {
                                if( req.durableSubscriptionName!=null ) {
  -                                     // Not Implemented yet
  -                                     //return 
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
  +                                     String queueId = 
JMSDestination.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
  +                                     return 
queue.getExclusiveQueue(queueId).receiveMessage();
                                }
                        } else  {
                                return 
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
  @@ -332,7 +338,8 @@
   
                        if( queue.isTopic ) {
                                if( req.durableSubscriptionName!=null ) {
  -                                     // 
queue.addExclusiveConsumer(dc.getClientID(), this);
  +                                     String queueId = 
queue.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
  +                                     queue.removeExclusiveConsumer(queueId, this);
                                } else {
                                        queue.removeSharedConsumer(this);
                                }
  
  
  
  1.3       +47 -8     spyderMQ/src/java/org/spydermq/server/JMSDestination.java
  
  Index: JMSDestination.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSDestination.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JMSDestination.java       2000/12/24 01:55:07     1.2
  +++ JMSDestination.java       2000/12/27 17:02:21     1.3
  @@ -26,7 +26,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class JMSDestination {
   
  @@ -108,7 +108,7 @@
   
                Log.log(""+this+"->addExclusiveConsumer(queue="+queue+", 
consumer="+c+")");
                
  -             ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
  +             ExclusiveQueue eq = getExclusiveQueue( queue );
                if( eq == null )
                        throw new JMSException("That destination queue does not 
exist");
                
  @@ -123,14 +123,16 @@
   
        public SpyMessage[] browse(String selector) throws JMSException {
                Log.log(""+this+"->browse(selector="+selector+")");             
  -             ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( 
DEFAULT_QUEUE_ID );
  +             ExclusiveQueue eq = getExclusiveQueue( DEFAULT_QUEUE_ID );
                return eq.browse( selector );
        }
   
        // Package protected ---------------------------------------------          
        ExclusiveQueue getExclusiveQueue(String queue) {
                
  -             return (ExclusiveQueue)exclusiveQueues.get( queue );
  +             synchronized (exclusiveQueues) {
  +                     return (ExclusiveQueue)exclusiveQueues.get( queue );           
 
  +             }
                
        }
   
  @@ -139,10 +141,13 @@
                Log.log(""+this+"->removeConsumerFromAll(consumer="+c+")");
   
                sharedQueue.removeConsumer(c);
  -             Iterator i = exclusiveQueues.values().iterator();
  -             while ( i.hasNext() ) {
  -                     ExclusiveQueue eq = (ExclusiveQueue)i.next();
  -                     eq.removeConsumer(c);
  +
  +             synchronized (exclusiveQueues) {
  +                     Iterator i = exclusiveQueues.values().iterator();
  +                     while ( i.hasNext() ) {
  +                             ExclusiveQueue eq = (ExclusiveQueue)i.next();
  +                             eq.removeConsumer(c);
  +                     }
                }
                
        }
  @@ -152,7 +157,7 @@
   
                Log.log(""+this+"->removeExclusiveConsumer(queue="+queue+", 
consumer="+c+")");
                
  -             ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( queue );
  +             ExclusiveQueue eq = getExclusiveQueue( queue );
                if( eq == null )
                        throw new JMSException("That destination queue does not 
exist");
                
  @@ -176,4 +181,38 @@
        public String toString() {
                return "JMSDestination:"+destination;
        }
  +
  +     public void createDurableSubscription(String clientId, String 
subscriptionName) throws JMSException
  +     {
  +             if( !isTopic ) 
  +                     throw new JMSException("Not a valid operation on a Queue");
  +
  +             String queueId = 
durableSubscriptionToQueueId(clientId,subscriptionName);
  +             
  +             synchronized (exclusiveQueues) {
  +                     exclusiveQueues.put(queueId, new ExclusiveQueue(server, 
queueId));              
  +             }
  +             
  +             server.persistenceManager.initQueue(destination, queueId);
  +             
  +     }
  +
  +     public void destoryDurableSubscription(String clientId, String 
subscriptionName) throws JMSException
  +     {
  +             if( !isTopic ) 
  +                     throw new JMSException("Not a valid operation on a Queue");
  +
  +             String queueId = 
durableSubscriptionToQueueId(clientId,subscriptionName);
  +             synchronized (exclusiveQueues) {                
  +                     exclusiveQueues.remove(queueId);
  +             }
  +             server.persistenceManager.destroyQueue(destination, queueId);
  +             
  +     }
  +
  +     static public String durableSubscriptionToQueueId(String clientId, String 
subscriptionName) 
  +     {
  +             return clientId+"-"+subscriptionName;
  +     }
  +
   }
  
  
  
  1.10      +16 -25    spyderMQ/src/java/org/spydermq/server/StartServer.java
  
  Index: StartServer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- StartServer.java  2000/12/26 04:16:00     1.9
  +++ StartServer.java  2000/12/27 17:02:21     1.10
  @@ -27,6 +27,7 @@
   import java.util.Set;
   import java.util.LinkedList;
   import java.util.Iterator;
  +import java.util.Enumeration;
   
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  @@ -36,8 +37,8 @@
   import org.spydermq.SpyTopicConnectionFactory;
   import org.spydermq.xml.XElement;
   import org.spydermq.persistence.SpyTxLog;
  -import org.spydermq.security.UserManager;
   
  +
   /**
    *   Class used to start a JMS service.  This can be called from inside another
    
  @@ -48,7 +49,7 @@
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class StartServer implements Runnable
   {
  @@ -163,9 +164,10 @@
                        theServer.serverConfig = serverCfg;
   
                        //Create a UserManager object
  -                     UserManager userManager=new UserManager();
  +                     UserManager userManager=new UserManager(theServer, 
serverCfg.getElement("UserManager"));
                        theServer.userManager = userManager;
   
  +                     //Creatye a PersistenceManager object
                        PersistenceManager persistenceManager = new 
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
                        theServer.persistenceManager = persistenceManager;
                        
  @@ -174,9 +176,9 @@
                        //create the known topics
                        Context subcontext=ctx.createSubcontext("topic");
                        
  -                     Iterator iter = serverCfg.getElementsNamed("Topic");
  -                     while( iter.hasNext() ) {
  -                             XElement element = (XElement)iter.next();
  +                     Enumeration enum = serverCfg.getElementsNamed("Topic");
  +                     while( enum.hasMoreElements() ) {
  +                             XElement element = (XElement)enum.nextElement();
                                String name = element.getField("Name");
   
                                Topic t=theServer.newTopic(name);
  @@ -187,36 +189,25 @@
                        //create the known queues
                        subcontext=ctx.createSubcontext("queue");
                        
  -                     iter = serverCfg.getElementsNamed("Queue");
  -                     while( iter.hasNext() ) {
  -                             XElement element = (XElement)iter.next();
  +                     enum = serverCfg.getElementsNamed("Queue");
  +                     while( enum.hasMoreElements() ) {
  +                             XElement element = (XElement)enum.nextElement();
                                String name = element.getField("Name");
   
                                Queue q=theServer.newQueue(name);
                                subcontext.rebind(name,q);
                        }
  -
   
  -                     //Set the known Ids
  -                     iter = serverCfg.getElementsNamed("User");
  -                     while( iter.hasNext() ) {
  -                             XElement element = (XElement)iter.next();
  -                             String name = element.getField("Name");
  -                             String passwd = element.getField("Password");
  -                             if( element.containsField("Id") ) {
  -                                     
userManager.addUser(name,passwd,element.getField("Id"));
  -                             } else {
  -                                     userManager.addUser(name,passwd,null);
  -                             }
  -                     }
  +                     // Resubscribe the durable subscriptions
  +                     userManager.initDurableSubscriptions();
   
  -                     // Restore the persistent messages to thie queues.
  +                     // Restore the persistent messages to thier queues.
                        theServer.persistenceManager.restore();
   
  -                     iter = serverCfg.getElementsNamed("InvocationLayer");
  -                     while( iter.hasNext() ) {
  +                     enum = serverCfg.getElementsNamed("InvocationLayer");
  +                     while( enum.hasMoreElements()) {
                                
  -                             XElement element = (XElement)iter.next();
  +                             XElement element = (XElement)enum.nextElement();
                                String name = element.getField("Name");
                                String topicConnectionFactoryJNDI = 
element.getField("TopicConnectionFactoryJNDI");
                                String queueConnectionFactoryJNDI = 
element.getField("QueueConnectionFactoryJNDI");
  
  
  
  1.5       +24 -0     spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- PersistenceManager.java   2000/12/24 01:55:06     1.4
  +++ PersistenceManager.java   2000/12/27 17:02:22     1.5
  @@ -26,7 +26,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class PersistenceManager {
   
  @@ -305,6 +305,30 @@
                        LogInfo info = new LogInfo(log, dest, queueId);
                        
                        messageLogs.put(""+dest+"-"+queueId, info);
  +
  +             } catch (javax.jms.JMSException e) {
  +                     throw e;
  +             } catch (Exception e) {
  +                     javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  +                     newE.setLinkedException(e);
  +                     throw newE;
  +             }
  +
  +     }
  +
  +     public void destroyQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  +
  +             try {
  +
  +                     URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
  +                     java.io.File file = new java.io.File(logFile.getFile());
  +     
  +                     SpyMessageLog log = 
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
  +                     if( log == null )
  +                             throw new JMSException("The persistence log was never 
initialized");
  +                     log.close();
  +
  +                     file.delete();
   
                } catch (javax.jms.JMSException e) {
                        throw e;
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/UserManager.java
  
  Index: UserManager.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import java.util.Hashtable;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.Collection;
  import java.util.Enumeration;
  
  import javax.jms.JMSException;
  
  import org.spydermq.xml.XElement;
  import org.spydermq.Log;
  import org.spydermq.SpyTopic;
  import org.spydermq.server.JMSServer;
  
  /**
   * This class is a simple User Manager. It handles credential issues.
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class UserManager
  {
        //Known users hashed by login
        private Hashtable users;
        //registered clientID 
        private HashSet clientID;               
        JMSServer server;
        org.spydermq.xml.XElement userManagerConfig;
  
        
        public void addUser(String login,String passwd,String clientID)
        {
                users.put(login,new Identity(login,passwd,clientID));
        }
        
        public String checkUser(String login,String passwd) throws JMSException
        {
                Identity user=(Identity)users.get(login);               
                if (user==null) throw new JMSException("This user does not exist");    
         
                if (!passwd.equals(user.passwd)) throw new JMSException("Bad 
password");
                
                if (user.clientID!=null) {
                        if (clientID.contains(user.clientID)) throw new 
JMSException("This clientID is already registered !");
                        clientID.add(user.clientID);
                }
                
                return user.clientID;
        }
        
        public void check(String login,String passwd,String clientID)
        {               
                synchronized (users) {
                        users.put(login,new Identity(login,passwd,clientID));
                }
        }
        
        public void addClientID(String ID) throws JMSException
        {
                                
                //Check : this ID must not be registered
                if (clientID.contains(ID)) throw new JMSException("This clientID is 
already registered !");
  
                //Check : this ID must not be password protected
                synchronized (users) {
                        Iterator i=users.values().iterator();
                        if (i!=null) {
                                while (i.hasNext()) {
                                        Identity id=(Identity)i.next();
                                        if (id.clientID!=null)
                                                if (id.clientID.equals(ID))
                                                        throw new JMSException("This 
clientID is password protected !");
                                }
                        }
                }
                
                clientID.add(ID);
        }
        
        public void removeID(String ID)
        {
                clientID.remove(ID);
        }
        
        public class Identity {
                
                String login;
                String passwd;
                String clientID;
                
                Identity(String login,String passwd,String clientID)
                {
                        this.login=login;
                        this.passwd=passwd;
                        this.clientID=clientID;
                }
  
        }
  
        public UserManager(JMSServer server, XElement userManagerConfig) throws 
org.spydermq.xml.XElementException
        {
                this.server=server;
                this.userManagerConfig=userManagerConfig;
                users=new Hashtable();
                clientID=new HashSet();
  
                //Set the known Ids
                Enumeration enum = userManagerConfig.getElementsNamed("User");
                while( enum.hasMoreElements() ) {
                        XElement element = (XElement)enum.nextElement();
                        String name = element.getField("Name");
                        String passwd = element.getField("Password");
                        if( element.containsField("Id") ) {
                                addUser(name,passwd,element.getField("Id"));
                        } else {
                                addUser(name,passwd,null);
                        }
                }
                
        }
  
        public void initDurableSubscriptions() throws 
org.spydermq.xml.XElementException {
  
                //Set the known Ids
                Enumeration enum = 
userManagerConfig.getElementsNamed("User/DurableSubscription");
                while( enum.hasMoreElements() ) {
                        XElement element = (XElement)enum.nextElement();
                        
                        String clientId = element.getField("../Id");
                        String name = element.getField("Name");
                        String topicName = element.getField("TopicName");
  
                        try {
                                
                                Log.log("Restarting Durable Subscription: 
"+clientId+","+name+","+topicName);
                                SpyTopic topic=new SpyTopic(topicName);
  
                                JMSDestination dest = server.getJMSDestination(topic);
                                dest.createDurableSubscription(clientId, name);
  
                        } catch (JMSException e ) {
                                Log.error("Could not initialize a durable subscription 
for : Client Id="+clientId+", Name="+name+", Topic Name="+topicName);
                                Log.error(e);
                        }
                }
                
        }
  
        public void setDurableSubscription(String clientId, String name, SpyTopic 
topic) throws JMSException {
  
                try {
                        //Set the known Ids
                        Enumeration enum = userManagerConfig.getElementsNamed("User");
                        while( enum.hasMoreElements() ) {
  
                                // Match the User.Name
                                XElement user = (XElement)enum.nextElement();
                                if( !user.containsField("Id") || 
!user.getField("Id").equals(clientId) ) 
                                        continue;
  
                                XElement subscription=null;
  
                                // Match the User/DurableSubscription.Name
                                Enumeration enum2 = 
user.getElementsNamed("DurableSubscription");
                                while( enum2.hasMoreElements() ) {
                                        XElement t = (XElement)enum2.nextElement();
                                        if( t.getField("Name").equals(name)) {
                                                subscription = t;
                                                break;
                                        }
                                }
  
                                if( subscription == null ) {
                                        // it was not previously registered...
                                        if( topic == null )
                                                return;
  
                                        subscription = new 
XElement("DurableSubscription");
                                        subscription.addField("Name", name);
                                        subscription.addField("TopicName", 
topic.getName());
                                        user.addElement(subscription);
                                                                                       
 
                                        JMSDestination dest = 
server.getJMSDestination(topic);
                                        dest.createDurableSubscription(clientId, name);
                                        
                                        server.saveConfig();
                                        
                                        
                                } else {
                                        // it was previously registered...
                                        if( 
subscription.getField("TopicName").equals(topic.getName()) ) {
                                                // and it is the same as before, do 
nothing.
                                                return;
                                        } else {
                                                // we have to change the 
subscription...
                                                SpyTopic prevTopic = new SpyTopic( 
subscription.getField("TopicName") );
                                                JMSDestination dest = 
server.getJMSDestination(prevTopic);                                              
                                                
dest.destoryDurableSubscription(clientId, name);
  
                                                if( topic == null ) {
                                                        
subscription.removeFromParent();
                                                } else {
                                                        
subscription.setField("TopicName", topic.getName());
                                                        dest = 
server.getJMSDestination(topic);
                                                        
dest.createDurableSubscription(clientId, name);
                                                }
                                                
                                                server.saveConfig();
                                        }
                                }
                                
                                return;
                        }
                } catch ( java.io.IOException e ) {
                        JMSException newE = new JMSException("Could not setup the 
durable subscription");
                        newE.setLinkedException(e);
                        throw newE;
                } catch ( org.spydermq.xml.XElementException e ) {
                        JMSException newE = new JMSException("Could not setup the 
durable subscription");
                        newE.setLinkedException(e);
                        throw newE;
                }
                
        }
  }
  
  
  

Reply via email to