User: hiram   
  Date: 00/12/11 21:58:47

  Modified:    src/java/org/spydermq/server SpyderMQServiceMBean.java
                        SpyderMQService.java StartServer.java
  Added:       src/java/org/spydermq/server JMSServerQueueReceiver.java
                        InvocationLayerFactory.java JMSServerQueue.java
                        JMSServerMBean.java JMSServer.java
  Removed:     src/java/org/spydermq/server RemoteControl.java Main.java
                        RemoteControlImpl.java
  Log:
  Several Chanages:
  - Invocation layer simplified by joing the DistributedQueueConnectionFactory and 
DistributedTopicConnectionFactory into DistributedConnectionFactory
  - Seperated server code from client code ( server code moved to org.spydermq.server )
  - All publish() calls are now sync to the provider.
  - Added a Transaction class to better represent a commit/rollback request to the 
server.
  - Now have a InvocationLayerFactory so that we can potentialy load multiple 
invocation layers (OIL/UIL/RMI) at the same time.
  
  Revision  Changes    Path
  1.2       +25 -2     spyderMQ/src/java/org/spydermq/server/SpyderMQServiceMBean.java
  
  Index: SpyderMQServiceMBean.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SpyderMQServiceMBean.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyderMQServiceMBean.java 2000/11/29 00:19:39     1.1
  +++ SpyderMQServiceMBean.java 2000/12/12 05:58:43     1.2
  @@ -7,19 +7,42 @@
   
   package org.spydermq.server;
   
  +/*
  + * jBoss, the OpenSource EJB server
  + *
  + * Distributable under GPL license.
  + * See terms of license at gnu.org.
  + */
  +
  +/*
  + * jBoss, the OpenSource EJB server
  + *
  + * Distributable under GPL license.
  + * See terms of license at gnu.org.
  + */
  +
   /**
    *   <description> 
    * MBean interface for the SpyderMQ JMX service.
    *      
    *   @see <related>
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public interface SpyderMQServiceMBean
      extends org.jboss.util.ServiceMBean
   {
      // Constants -----------------------------------------------------
      public static final String OBJECT_NAME = ":service=SpyderMQ";
  -    
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
  +   // Public --------------------------------------------------------
  +
  +     
      // Public --------------------------------------------------------
   }
  
  
  
  1.2       +73 -73    spyderMQ/src/java/org/spydermq/server/SpyderMQService.java
  
  Index: SpyderMQService.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SpyderMQService.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyderMQService.java      2000/11/29 00:19:39     1.1
  +++ SpyderMQService.java      2000/12/12 05:58:43     1.2
  @@ -28,88 +28,88 @@
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
    *   @author <a href="mailto:[EMAIL PROTECTED]">Juha Lindfors</a>
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyderMQService
      extends ServiceMBeanSupport
      implements SpyderMQServiceMBean, MBeanRegistration
   {
      // Constants -----------------------------------------------------
  -    
  +     
      // Attributes ----------------------------------------------------
   
  -    MBeanServer mBeanServer = null;
  -    Object spyderMQServer = null;
  +     MBeanServer mBeanServer = null;
  +     Object spyderMQServer = null;
   
  -    public SpyderMQService() {
  -    }
  -    
  -    // Public --------------------------------------------------------
  -    public ObjectName getObjectName(MBeanServer server, ObjectName name)
  -        throws javax.management.MalformedObjectNameException {
  -        this.mBeanServer = server;
  -        return new ObjectName(OBJECT_NAME);
  -    }
  -    
  -    public String getName() {
  -        return "SpyderMQ";
  -    }
  -    
  -    public void startService()
  -        throws Exception {
  -        if (spyderMQServer == null) {
  -            final Log log = this.log;
  -            
  +     public SpyderMQService() {
  +     }
  +     
  +     // Public --------------------------------------------------------
  +     public ObjectName getObjectName(MBeanServer server, ObjectName name)
  +             throws javax.management.MalformedObjectNameException {
  +             this.mBeanServer = server;
  +             return new ObjectName(OBJECT_NAME);
  +     }
  +     
  +     public String getName() {
  +             return "SpyderMQ";
  +     }
  +     
  +     public void startService()
  +             throws Exception {
  +             if (spyderMQServer == null) {
  +                     final Log log = this.log;
  +                     
            try {
  -                log.log("Testing if SpyderMQ is present....");
  -                try {
  -                    spyderMQServer = 
Thread.currentThread().getContextClassLoader().loadClass("org.spydermq.server.StartServer").newInstance();
 
  -                    log.log("OK");
  -                }catch(Exception e) {
  -                    log.log("failed");
  -                    log.log("SpyderMQ wasn't found:");
  -                    log.debug(e.getMessage());
  -                    return;
  -                } 
  -                
  -                Class[]  spyderMQArgsClasses = { MBeanServer.class };
  -                Object[] spyderMQArgs        = { mBeanServer };
  -                
  -                Method startMethod = spyderMQServer.getClass().getMethod("start", 
  -                                                                  
spyderMQArgsClasses);
  -                
  -                Logger.log("Starting SpyderMQ...");
  -                startMethod.invoke(spyderMQServer, spyderMQArgs); 
  -            }
  +                             log.log("Testing if SpyderMQ is present....");
  +                             try {
  +                                     spyderMQServer = 
Thread.currentThread().getContextClassLoader().loadClass("org.spydermq.server.StartServer").newInstance();
 
  +                                     log.log("OK");
  +                             }catch(Exception e) {
  +                                     log.log("failed");
  +                                     log.log("SpyderMQ wasn't found:");
  +                                     log.debug(e.getMessage());
  +                                     return;
  +                             } 
  +                             
  +                             Class[]  spyderMQArgsClasses = { MBeanServer.class };
  +                             Object[] spyderMQArgs        = { mBeanServer };
  +                             
  +                             Method startMethod = 
spyderMQServer.getClass().getMethod("start", 
  +                                                                                    
                                           spyderMQArgsClasses);
  +                             
  +                             Logger.log("Starting SpyderMQ...");
  +                             startMethod.invoke(spyderMQServer, spyderMQArgs); 
  +                     }
                catch (Exception e) {
  -                log.error("SpyderMQ failed");
  -                log.exception(e);
  -            }
  -        }
  -    }
  -    
  -    public void stopService() {
  -        Class [] spyderMQArgsClasses = null;
  -        Method stopMethod = null;
  -        Object [] spyderMQArgs = null;
  -        
  -        if (this.spyderMQServer != null) {
  -            try {
  -                spyderMQArgsClasses = new Class[0];
  -                stopMethod = this.spyderMQServer.getClass().getMethod("stop", 
  -                                                                      
spyderMQArgsClasses);
  -                spyderMQArgs = new Object[0];
  -                
  -                // [FIXME] jpl
  -                //      This causes some error messages on the console so
  -                //      disabled for now
  -                
  -                //stopMethod.invoke(spyderMQServer, spyderMQArgs);
  -                this.spyderMQServer = null;
  -            } catch (Exception e) {
  -                log.error("SpyderMQ failed");
  -                log.exception(e);
  -            }
  -        }
  -    }
  +                             log.error("SpyderMQ failed");
  +                             log.exception(e);
  +                     }
  +             }
  +     }
  +     
  +     public void stopService() {
  +             Class [] spyderMQArgsClasses = null;
  +             Method stopMethod = null;
  +             Object [] spyderMQArgs = null;
  +             
  +             if (this.spyderMQServer != null) {
  +                     try {
  +                             spyderMQArgsClasses = new Class[0];
  +                             stopMethod = 
this.spyderMQServer.getClass().getMethod("stop", 
  +                                                                                    
                                                   spyderMQArgsClasses);
  +                             spyderMQArgs = new Object[0];
  +                             
  +                             // [FIXME] jpl
  +                             //      This causes some error messages on the console 
so
  +                             //      disabled for now
  +                             
  +                             //stopMethod.invoke(spyderMQServer, spyderMQArgs);
  +                             this.spyderMQServer = null;
  +                     } catch (Exception e) {
  +                             log.error("SpyderMQ failed");
  +                             log.exception(e);
  +                     }
  +             }
  +     }
   }
  
  
  
  1.2       +219 -279  spyderMQ/src/java/org/spydermq/server/StartServer.java
  
  Index: StartServer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- StartServer.java  2000/08/25 02:30:25     1.1
  +++ StartServer.java  2000/12/12 05:58:44     1.2
  @@ -1,313 +1,253 @@
  -package org.spydermq.server;
  -
   /*
    * spyderMQ, the OpenSource JMS implementation
    *
    * Distributable under GPL license.
    * See terms of license at gnu.org.
    */
  -
  -import org.spydermq.JMSServer;
  -import org.spydermq.security.SecurityManager;
  -import org.spydermq.distributed.interfaces.DistributedJMSServer;
  -import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  -import org.spydermq.distributed.interfaces.DistributedTopicConnectionFactory;
  -import org.spydermq.distributed.interfaces.DistributedQueueConnectionFactory;
  -import org.spydermq.distributed.JMSServerFactory;
  -import org.spydermq.distributed.SpyTopicConnectionFactory;
  -import org.spydermq.distributed.SpyQueueConnectionFactory;
  -import org.spydermq.Log;
  +package org.spydermq.server;
   
   import javax.jms.TopicConnectionFactory;
   import javax.jms.QueueConnectionFactory;
   import javax.jms.JMSException;
   import javax.jms.Topic;
   import javax.jms.Queue;
  -
  -import org.jnp.server.NamingServer;
   import javax.naming.InitialContext;
   import javax.naming.Context;
  -import java.util.Properties;
  -import java.io.InputStream;
  -import java.util.StringTokenizer;
  -
   import javax.management.MBeanServer;
   import javax.management.ObjectName;
   import javax.management.NotCompliantMBeanException;
   import javax.management.MBeanRegistrationException;
   import javax.management.InstanceAlreadyExistsException;
   import javax.management.loading.*;
  +
  +import java.util.Properties;
  +import java.io.InputStream;
  +import java.util.StringTokenizer;
   import java.net.*;
   import java.util.Set;
   import java.util.LinkedList;
   import java.util.Iterator;
   
  +import org.spydermq.security.SecurityManager;
  +import org.spydermq.distributed.interfaces.DistributedJMSServer;
  +import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  +import org.spydermq.SpyQueueConnectionFactory;
  +import org.spydermq.server.JMSServer;
  +import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
  +import org.spydermq.SpyTopicConnectionFactory;
  +
   /**
  - *  Class used to start a JMS service.  This can be called from
  - *  inside another application to start JMS.
  + *   Class used to start a JMS service.  This can be called from inside another
  + *  application to start the JMS provider.
    *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  - *   @author Rich Johns ([EMAIL PROTECTED])
  - *      @author Vincent Sheffer ([EMAIL PROTECTED])
  + *  @author Rich Johns ([EMAIL PROTECTED])
  + *  @author Vincent Sheffer ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class StartServer implements Runnable
   {
  -    private JMSServer theServer;
  -    private MBeanServer mBeanServer;
  -    private LinkedList serviceList = new LinkedList();
  -
  -    SpyTopicConnectionFactory topicConnectionFactory;
  -    SpyQueueConnectionFactory queueConnectionFactory;
  -    DistributedTopicConnectionFactory distributedTopicConnectionFactory;
  -    DistributedQueueConnectionFactory distributedQueueConnectionFactory;
  -    DistributedJMSServerSetup theDistributedServerSetup;
  -    DistributedJMSServer theDistributedServer;
  -    SecurityManager securityManager;
  -
  -    //DEBUG -- For the RemoteControl object
  -    private static RemoteControlImpl remoteControl;
  -
  -    /**
  -     * Start the JMS server running in it's own thread.
  -     *
  -     * @param  server  An <code>MBeanServer</code> to 
  -     *                 add the JMS related JMX services to.
  -     */    
  -    public void start(MBeanServer server) {
  -        Thread thread = null;
  -
  -        this.mBeanServer = server;
  -        if (thread == null) {
  -            thread = new Thread(this);
  -            thread.start();
  -        }
  -    }
  -
  -    /**
  -     * Stop the JMS service.  This stops the thread and 
  -     * unregisters the JMX services related to JMS.
  -     */
  -    public void stop() {
  -        theServer.stopServer();
  -
  -        /*
  -         * Need to unregister all of the JMS related JMX services.
  -         * TODO:  Don't know how to do this just yet.
  -         */
  -        while (!theServer.isStopped()) {
  -            try {
  -                Thread.sleep(100);
  -            } catch (InterruptedException e) {
  -            }
  -        }
  -
  -        /*
  -         * Now that we know the JMS server has stopped we 
  -         * can unregister the JMS services.
  -         */
  -        unregisterServices();
  -    }
  -
  -    public StartServer()
  -    {
  -    }
  -
  -    /**
  -     * Register a JMX service.  This is a wrapper method for
  -     * <code>MBeanServer.registerMBean</code> that adds the
  -     * service name to a list.  When the JMS service is stopped
  -     * all of the services that SpyderMQ started are unregistered.
  -     *
  -     * @param  obj      The <code>Object</code> to register.
  -     * @param  objName  The name of the service.  It is this name
  -     *                  that gets added to the list of services.
  -     */
  -    private void registerService(Object obj, ObjectName objName) 
  -    throws NotCompliantMBeanException, MBeanRegistrationException,
  -           InstanceAlreadyExistsException {
  -        this.mBeanServer.registerMBean(obj, objName);
  -        if (!serviceList.contains(objName)) {
  -            serviceList.add(objName);
  -        }
  -    }
  -
  -    /**
  -     * Unregister all of the JMX services from the <code>MBeanServer</code>.
  -     */
  -    private void unregisterServices() {
  -        Iterator iter = null;
  -
  -        iter = serviceList.iterator();
  -        while (iter.hasNext()) {
  -            ObjectName objName = null;
  -
  -            objName = (ObjectName) iter.next();
  -            Log.notice("Unregistering '" + objName + "'");
  -            try {
  -                this.mBeanServer.unregisterMBean(objName);
  -            } catch (Exception e) {
  -                Log.error("Cannot unregister the JMS service '" +
  -                          objName + "': " + e.getMessage()); 
  -                Log.error(e); 
  -            }
  -        }
  -    }
  -
  -    public void run() {
  -        Log.notice("SpyderMQ [v0.5]");
  -        
  -        try { 
  -            
  -            //Load the property file
  -            InputStream in = 
getClass().getClassLoader().getResource("spyderMQ.properties").openStream();
  -            Properties cfg=new Properties();
  -            cfg.load(in);
  -            in.close();
  -            
  -            // By default we will start a JNDI Server. We won't
  -            // if user explicitly tells us not to.
  -            String noStartJNDI = (String)cfg.get("DoNotStartJNDI" );
  -            if((noStartJNDI == null) || 
  -               (noStartJNDI.trim().length() == 0)) {
  -                //Start the JNDI server
  -
  -                Log.notice( "Starting JNDI Server (JNP)" );
  -                new org.jnp.server.Main().start();
  -            } else {
  -                // Need to warn user because if they fail to start
  -                // a JNDI server prior to starting spyder, they
  -                // will be confused by the resulting error.
  -                Log.notice("[Warning]: SpyderMQ.properties specifys NOT to start a 
JNDI Server.");
  -                Log.notice("           If a JNDI Server is not running SpyderMQ 
will not start.");
  -            }
  -            
  -            //Get an InitialContext
  -            InitialContext ctx=new InitialContext();
  -            
  -            //Create a SecurityManager object
  -            securityManager=new SecurityManager();
  -            
  -            //Create the JMSServer object
  -            theServer = new JMSServer(securityManager);
  -            theDistributedServerSetup = 
JMSServerFactory.createJMSServer(theServer,cfg);
  -            theDistributedServer=theDistributedServerSetup.createClient();
  -            String 
connectionReceiverCN=(String)cfg.get("ConnectionReceiverClassName");
  -            registerService(theServer, new ObjectName(JMSServer.OBJECT_NAME));
  -            registerService(theDistributedServerSetup, 
  -                            new 
ObjectName("JMS:service=DistributedJMSServerSetup"));
  -            
  -            //Get the Topic properties
  -            String 
topicConnectionFactoryCN=(String)cfg.get("DistributedTopicConnectionFactoryClassName");
  -            if (topicConnectionFactoryCN == null ||
  -                connectionReceiverCN == null) {
  -                throw new RuntimeException("Missing configuration parameters");
  -            }
  -            
  -            //Create the distributedTopicConnectionFactory object                   
 
  -            distributedTopicConnectionFactory = 
(DistributedTopicConnectionFactory)Class.forName(topicConnectionFactoryCN).newInstance();
  -            distributedTopicConnectionFactory.setServer(theDistributedServer);
  -            distributedTopicConnectionFactory.setCRClassName(connectionReceiverCN);
  -            distributedTopicConnectionFactory.setSecurityManager(securityManager);
  -            registerService(distributedTopicConnectionFactory, 
  -                            new 
ObjectName("JMS:service=DistributedTopicConnectionFactory"));
  -            
  -            //Create the topicConnectionFactory object                       
  -            topicConnectionFactory = new 
SpyTopicConnectionFactory(distributedTopicConnectionFactory);
  -            
  -            //create the known topics
  -            Context subcontext=ctx.createSubcontext("topic");
  -            String topics=(String)cfg.get("knownTopics");
  -            
  -            if (topics!=null) {
  -                
  -                StringTokenizer st = new StringTokenizer(topics,", ");
  -                
  -                while (st.hasMoreElements()) {
  -                    String name=(String)st.nextElement();
  -                    Topic t=theServer.newTopic(name);
  -                    subcontext.rebind(name,t);
  -                }
  -                
  -            } else Log.notice("Warning: no known Topics !"); 
  -            
  -            //Get the queue properties
  -            String 
queueConnectionFactoryCN=(String)cfg.get("DistributedQueueConnectionFactoryClassName");
  -            if (queueConnectionFactoryCN==null) throw new RuntimeException("Missing 
configuration parameter");
  -            
  -            //Create the distributedTopicConnectionFactory object                   
 
  -            distributedQueueConnectionFactory = 
(DistributedQueueConnectionFactory)Class.forName(queueConnectionFactoryCN).newInstance();
  -            distributedQueueConnectionFactory.setServer(theDistributedServer);
  -            distributedQueueConnectionFactory.setCRClassName(connectionReceiverCN);
  -            distributedQueueConnectionFactory.setSecurityManager(securityManager);
  -            registerService(distributedQueueConnectionFactory, 
  -                            new 
ObjectName("JMS:service=DistributedQueueConnectionFactory"));
  -            
  -            //Create the topicConnectionFactory object                       
  -            queueConnectionFactory = new 
SpyQueueConnectionFactory(distributedQueueConnectionFactory);
  -            
  -            //create the known queues
  -            subcontext=ctx.createSubcontext("queue");
  -            String queues=(String)cfg.get("knownQueues");
  -            
  -            if (queues!=null) {
  -                
  -                StringTokenizer st = new StringTokenizer(queues,", ");
  -                
  -                while (st.hasMoreElements()) {
  -                    String name=(String)st.nextElement();
  -                    Queue q=theServer.newQueue(name);
  -                    subcontext.rebind(name,q);
  -                }
  -                
  -            } else {
  -                Log.notice("Warning: no known Queues !"); 
  -            }
  -            
  -            //Set the known Ids
  -            String ids=(String)cfg.get("knownIds");
  -            
  -            if (ids!=null) {
  -                
  -                StringTokenizer st = new StringTokenizer(ids,", ");
  -                
  -                while (st.hasMoreElements()) {
  -                    String read=(String)st.nextElement();
  -                    int pos=read.indexOf(':');
  -                    if (pos==-1) throw new JMSException("Bad configuration file 
(missing separator in knownIds)");
  -                    String name=read.substring(0,pos);
  -                    String passwd=read.substring(pos+1);
  -                    pos=passwd.indexOf(':');
  -                    
  -                    if (pos==-1) {
  -                        Log.log("[JMSServer] new user : Login = "+name+", Id = 
[none]");                                                     
  -                        securityManager.addUser(name,passwd,null);
  -                    } else {
  -                        String ID=passwd.substring(pos+1);
  -                        Log.log("[JMSServer] new user : Login = "+name+", Id = 
"+ID);                                                        
  -                        securityManager.addUser(name,passwd.substring(0,pos),ID);
  -                    }
  -                    
  -                }
  -                
  -            } else {
  -                Log.notice("Warning: no known Ids !"); 
  -            }
  -            
  -            //(re)bind the connection factories in the JNDI namespace
  -            ctx.rebind("TopicConnectionFactory",topicConnectionFactory);
  -            ctx.rebind("QueueConnectionFactory",queueConnectionFactory);
  -            
  -            //DEBUG -- Create a RemoteControl object
  -            //Administration calls will soon be done using JMX.
  -            remoteControl=new RemoteControlImpl(theServer);
  -            ctx.rebind("RemoteControl",remoteControl);
  -            
  -        } catch (Exception e) { 
  -            Log.error("Cannot start the JMS server ! "+e.getMessage()); 
  -            Log.error(e); 
  -        } 
  -        
  -    }
  +     private JMSServer theServer;
  +     private MBeanServer mBeanServer;
  +     private LinkedList serviceList = new LinkedList();
  +     SecurityManager securityManager;
  +
  +     /**
  +      * Start the JMS server running in it's own thread.
  +      *
  +      * @param  server  An <code>MBeanServer</code> to
  +      *                 add the JMS related JMX services to.
  +      */
  +     public void start(MBeanServer server) {
  +             Thread thread = null;
  +
  +             this.mBeanServer = server;
  +             if (thread == null) {
  +                     thread = new Thread(this);
  +                     thread.start();
  +             }
  +     }
  +
  +     /**
  +      * Stop the JMS service.  This stops the thread and
  +      * unregisters the JMX services related to JMS.
  +      */
  +     public void stop() {
  +             theServer.stopServer();
  +
  +             /*
  +              * Need to unregister all of the JMS related JMX services.
  +              * TODO:  Don't know how to do this just yet.
  +              */
  +             while (!theServer.isStopped()) {
  +                     try {
  +                             Thread.sleep(100);
  +                     } catch (InterruptedException e) {
  +                     }
  +             }
  +
  +             /*
  +              * Now that we know the JMS server has stopped we
  +              * can unregister the JMS services.
  +              */
  +             unregisterServices();
  +     }
  +
  +     public StartServer()
  +     {
  +     }
  +
  +     /**
  +      * Register a JMX service.  This is a wrapper method for
  +      * <code>MBeanServer.registerMBean</code> that adds the
  +      * service name to a list.  When the JMS service is stopped
  +      * all of the services that SpyderMQ started are unregistered.
  +      *
  +      * @param  obj      The <code>Object</code> to register.
  +      * @param  objName  The name of the service.  It is this name
  +      *                  that gets added to the list of services.
  +      */
  +     private void registerService(Object obj, ObjectName objName)
  +     throws NotCompliantMBeanException, MBeanRegistrationException,
  +     InstanceAlreadyExistsException {
  +             this.mBeanServer.registerMBean(obj, objName);
  +             if (!serviceList.contains(objName)) {
  +                     serviceList.add(objName);
  +             }
  +     }
  +
  +     /**
  +      * Unregister all of the JMX services from the <code>MBeanServer</code>.
  +      */
  +     private void unregisterServices() {
  +             Iterator iter = null;
  +
  +             iter = serviceList.iterator();
  +             while (iter.hasNext()) {
  +                     ObjectName objName = null;
  +
  +                     objName = (ObjectName) iter.next();
  +                     System.out.println("Unregistering '" + objName + "'");
  +                     try {
  +                             this.mBeanServer.unregisterMBean(objName);
  +                     } catch (Exception e) {
  +                             System.err.println("Cannot unregister the JMS service 
'" +
  +                             objName + "': " + e.getMessage());
  +                             System.err.println(e);
  +                     }
  +             }
  +     }
  +
  +     public void run() {
  +             System.out.println("SpyderMQ [v0.5]");
  +
  +             try {
  +
  +                     //Load the property file
  +                     InputStream in = 
getClass().getClassLoader().getResource("spyderMQ.properties").openStream();
  +                     Properties cfg=new Properties();
  +                     cfg.load(in);
  +                     in.close();
  +
  +                     //Get an InitialContext
  +                     InitialContext ctx=new InitialContext();
  +
  +                     //Create a SecurityManager object
  +                     securityManager=new SecurityManager();
  +
  +                     //Create the JMSServer object
  +                     theServer = new JMSServer(securityManager);
  +                     registerService(theServer, new 
ObjectName(JMSServer.OBJECT_NAME));
  +
  +                     //create the known topics
  +                     Context subcontext=ctx.createSubcontext("topic");
  +                     String topics=(String)cfg.get("knownTopics");
  +
  +                     if (topics!=null) {
  +
  +                             StringTokenizer st = new StringTokenizer(topics,", ");
  +
  +                             while (st.hasMoreElements()) {
  +                                     String name=(String)st.nextElement();
  +                                     Topic t=theServer.newTopic(name);
  +                                     subcontext.rebind(name,t);
  +                             }
  +
  +                     } else System.out.println("Warning: no known Topics !");
  +
  +                     //create the known queues
  +                     subcontext=ctx.createSubcontext("queue");
  +                     String queues=(String)cfg.get("knownQueues");
  +
  +                     if (queues!=null) {
  +
  +                             StringTokenizer st = new StringTokenizer(queues,", ");
  +
  +                             while (st.hasMoreElements()) {
  +                                     String name=(String)st.nextElement();
  +                                     Queue q=theServer.newQueue(name);
  +                                     subcontext.rebind(name,q);
  +                             }
  +
  +                     } else {
  +                             System.out.println("Warning: no known Queues !");
  +                     }
  +
  +                     //Set the known Ids
  +                     String ids=(String)cfg.get("knownIds");
  +
  +                     if (ids!=null) {
  +
  +                             StringTokenizer st = new StringTokenizer(ids,", ");
  +
  +                             while (st.hasMoreElements()) {
  +                                     String read=(String)st.nextElement();
  +                                     int pos=read.indexOf(':');
  +                                     if (pos==-1) throw new JMSException("Bad 
configuration file (missing separator in knownIds)");
  +                                     String name=read.substring(0,pos);
  +                                     String passwd=read.substring(pos+1);
  +                                     pos=passwd.indexOf(':');
  +
  +                                     if (pos==-1) {
  +                                             //System.out.println("[JMSServer] new 
user : Login = "+name+", Id = [none]");
  +                                             
securityManager.addUser(name,passwd,null);
  +                                     } else {
  +                                             String ID=passwd.substring(pos+1);
  +                                             //System.out.println("[JMSServer] new 
user : Login = "+name+", Id = "+ID);
  +                                             
securityManager.addUser(name,passwd.substring(0,pos),ID);
  +                                     }
  +
  +                             }
  +
  +                     } else {
  +                             System.out.println("Warning: no known Ids !");
  +                     }
  +
  +                     //Set up the transports for the server
  +                     InvocationLayerFactory invocationLayerFactory= new 
InvocationLayerFactory();
  +                     invocationLayerFactory.distributedJMSServerClassName = 
(String)cfg.get("DistributedJMSServerClassName");
  +                     invocationLayerFactory.connectionReceiverClassName = 
(String)cfg.get("ConnectionReceiverClassName");
  +                     invocationLayerFactory.distributedConnectionFactoryClassName = 
(String)cfg.get("DistributedConnectionFactoryClassName");
  +
  +                     invocationLayerFactory.createObjects(theServer);
  +
  +                     
registerService(invocationLayerFactory.distributedJMSServerSetup,
  +                     new ObjectName("JMS:service=DistributedJMSServerSetup"));
  +
  +                     
registerService(invocationLayerFactory.distributedConnectionFactory,
  +                     new ObjectName("JMS:service=DistributedConnectionFactory"));
  +
  +                     //(re)bind the connection factories in the JNDI namespace
  +                     
ctx.rebind("TopicConnectionFactory",invocationLayerFactory.spyTopicConnectionFactory);
  +                     
ctx.rebind("QueueConnectionFactory",invocationLayerFactory.spyQueueConnectionFactory);
  +
  +
  +             } catch (Exception e) {
  +                     System.err.println("Cannot start the JMS server ! 
"+e.getMessage());
  +                     System.err.println(e);
  +             }
  +
  +     }
   }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java
  
  Index: JMSServerQueueReceiver.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.DeliveryMode;
  
  import org.spydermq.*;
  import org.spydermq.distributed.interfaces.ConnectionReceiver;
  import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  
  import java.util.HashMap;
  import java.util.Iterator;
  import java.io.Serializable;
  
  /**
   *    This class manages a connectionreceiver for a JMSServerQueue.
   *  Keeps track of listening state and unacknowleged messages.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class JMSServerQueueReceiver implements Serializable {
  
        // The queue messages will be comming from
        public JMSServerQueue jmsSeverQueue;
        // The connection mewssages will be going to
        public SpyDistributedConnection dc;
        // Used to know if the connection is accepting messages.
        public boolean listeners;
        public int receiveReuquests;
        // Keeps track of the unacknowledged messages send to the connection.
        public transient HashMap unacknowledgedMessages;
  
  
        
        // Consturctor ---------------------------------------------------
        public JMSServerQueueReceiver(JMSServerQueue serverQueue, 
SpyDistributedConnection sdc) {
                jmsSeverQueue = serverQueue;
                dc = sdc;
                listeners = false;
                receiveReuquests = 0;
                unacknowledgedMessages = new HashMap();
  
                Log.log("A ServerQueueReceiver has been created for : " + 
serverQueue.destination + "/" + dc.getClientID());
  
        }
  
        
        void acknowledge(String messageId, boolean ack) throws javax.jms.JMSException {
                SpyMessage m;
                synchronized (unacknowledgedMessages) {
                        m = (SpyMessage) unacknowledgedMessages.remove(messageId);
                }
  
                if (m == null)
                        return;
                        
                if (jmsSeverQueue.isTopic) {
                        // Not sure how we should handle the topic case.
                        // On a negative acknowledge, we don't want to
                        // add it back to the topic since other
                        // receivers might get a duplicate duplicate message.
                } else {                        
                        // Was it a negative acknowledge??
                        if (!ack) {
                                Log.log("Restoring message: " + m.messageId);
                                jmsSeverQueue.restoreMessage(m);
                        } else {
                                
                                if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 
) {
                                        
jmsSeverQueue.spyMessageLog.logRemoveMessage(m);
                                        jmsSeverQueue.spyMessageLog.commit();
                                }
                                
                                Log.log("Message Ack: " + m.messageId);
                        }
                }
  
        }
  
  
        // The connection is accepting new messages if there
        // is a listener or if the receiver has requested a message.    
        public boolean isListening() {
                return listeners || receiveReuquests > 0;
        }
  
        // Called when we send one message.
        public void removeReceiver() {
                if (receiveReuquests == 0)
                        return;
  
                receiveReuquests--;
                if (receiveReuquests == 0 && !listeners) {
                        jmsSeverQueue.listeners--;
                }
  
        }
  
        // This method gets invoked as a result of a receive() method call
        // on a QueueReceiver
        public void addReceiver(long wait) {
  
                // TODO: figure out a way to make a reciver eligable for
                // wait amount of time.
  
                receiveReuquests++;
                if (receiveReuquests == 1 && !listeners) {
                        jmsSeverQueue.listeners++;
                }
  
        }
  
        public void setListening(boolean value) {
                if (value == listeners)
                        return;
  
                listeners = value;
                if (value && receiveReuquests == 0)
                        jmsSeverQueue.listeners++;
  
                if (!value && receiveReuquests == 0)
                        jmsSeverQueue.listeners--;
        }
  
                        
        void sendMultipleMessages(SpyMessage mes[]) throws Exception {
                Log.log("DISPATCH: " + mes.length + " messages => " + 
dc.getClientID());
                if (!jmsSeverQueue.isTopic) { 
                        synchronized (unacknowledgedMessages) {
                                for (int i = 0; i < mes.length; i++) {
                                        
unacknowledgedMessages.put(mes[i].getJMSMessageID(), mes[i]);
                                }
                        }
                }
                dc.cr.receiveMultiple(jmsSeverQueue.destination, mes);
        }
  
        
        
        void sendOneMessage(SpyMessage mes) throws Exception {
                Log.log("DISPATCH: Message: " + mes + " => " + dc.getClientID());
                if (!jmsSeverQueue.isTopic) { 
                        synchronized (unacknowledgedMessages) {
                                unacknowledgedMessages.put(mes.getJMSMessageID(), mes);
                        }
                }
                removeReceiver();
                dc.cr.receive(jmsSeverQueue.destination, mes);
        }
  
  
        
        public void close() {
  
                Log.log("A ServerQueueReceiver has been closed: " + 
jmsSeverQueue.destination + "/" + dc.getClientID());
  
                if (isListening())
                        jmsSeverQueue.listeners--;
  
                synchronized (unacknowledgedMessages) {
                        Iterator iter = unacknowledgedMessages.values().iterator();
                        Log.log("Restoring " + unacknowledgedMessages.size() + " 
messages");
                        while (iter.hasNext()) {
                                SpyMessage m = (SpyMessage) iter.next();
                                jmsSeverQueue.restoreMessage(m);
                                iter.remove();
                        }
                }
        }
        
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/server/InvocationLayerFactory.java
  
  Index: InvocationLayerFactory.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import org.spydermq.Log;
  import org.spydermq.server.JMSServer;
  import org.spydermq.SpyConnection;
  import org.spydermq.SpyDistributedConnection;
  import org.spydermq.SpyQueueConnectionFactory;
  import org.spydermq.SpyTopicConnectionFactory;
  import org.spydermq.distributed.interfaces.ConnectionReceiver;
  import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
  import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  
  import java.util.Properties;
  import java.rmi.server.UnicastRemoteObject;
  import java.rmi.Remote;
  
  public class InvocationLayerFactory
  {
  
        // Set these attributes before the createObjects() call
        String connectionReceiverClassName;     
        String distributedConnectionFactoryClassName;   
        String distributedJMSServerClassName;   
        
        // These will be set after the createObjects() call
        DistributedJMSServerSetup distributedJMSServerSetup;    
        DistributedJMSServer distributedJMSServer;      
        DistributedConnectionFactory distributedConnectionFactory;      
        SpyQueueConnectionFactory spyQueueConnectionFactory;    
        SpyTopicConnectionFactory spyTopicConnectionFactory;    
        
        public void createObjects(JMSServer s) throws Exception
        {
                //Get the Topic properties
                if (distributedJMSServerClassName == null || 
distributedConnectionFactoryClassName == null || connectionReceiverClassName==null ) {
                        throw new RuntimeException("Missing configuration parameters");
                }
  
                distributedJMSServerSetup = 
(DistributedJMSServerSetup)Class.forName(distributedJMSServerClassName).newInstance();
                distributedJMSServerSetup.setServer(s);
                distributedJMSServer = distributedJMSServerSetup.createClient();
                
                //Create the distributedTopicConnectionFactory object                  
 
                distributedConnectionFactory = 
(DistributedConnectionFactory)Class.forName(distributedConnectionFactoryClassName).newInstance();
                distributedConnectionFactory.setServer(distributedJMSServer);
                
distributedConnectionFactory.setSecurityManager(s.getSecurityManager());
                
distributedConnectionFactory.setConnectionReceiverClassName(connectionReceiverClassName);
  
                //Create the Topic and Queue Connection Factory objects
                spyTopicConnectionFactory = new 
SpyTopicConnectionFactory(distributedConnectionFactory);
                spyQueueConnectionFactory = new 
SpyQueueConnectionFactory(distributedConnectionFactory);
        }
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  import org.spydermq.*;
  import org.spydermq.persistence.SpyMessageLog;
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  /**
   *    This class is a message queue which is stored (hashed by Destination) on the 
JMS provider
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class JMSServerQueue {
        // Attributes ----------------------------------------------------
  
        //the Destination of this queue
        SpyDestination destination;
        //DistributedConnection objs that have "registered" to this Destination
        private HashMap subscribers;
        //List of Pending messages
        private TreeSet messages;
        //Is a thread already working on this queue ? 
        //You cannot start two threads on the same destination (correct order of msgs)
        private boolean threadWorking;
        // Am I already in the task queue ? It is useless to put many times the same 
destination in the task queue.
        private boolean alreadyInTaskQueue;
        //If this is linked to a temporaryDestination, 
temporaryDestination=DistributedConnection of the owner, otherwise it's null
        SpyDistributedConnection temporaryDestination;
        //The JMSServer object
        private JMSServer server;
        //Am I a queue or a topic  
        boolean isTopic;
  
        //Nb of listeners for this Queue
        int listeners;
        //Counter used to number incomming messages. (Used to order the messages.)
        long messageIdCounter = Long.MIN_VALUE;
  
        // Keeps track of the last used connection so that we can do round robin 
distribution of p2p messages.
        private JMSServerQueueReceiver lastUsedQueueReceiver;   
        // Should we use the round robin aproach to pick the next reciver of a p2p 
message?
        private boolean useRoundRobinMessageDistribution = true;
  
        // Used to log the persistent messages.
        SpyMessageLog spyMessageLog;
        
        
        // Constructor ---------------------------------------------------         
        JMSServerQueue(SpyDestination dest,SpyDistributedConnection 
temporary,JMSServer server) throws JMSException
        {
                destination=dest;
                subscribers=new HashMap();
                messages=new TreeSet();
                threadWorking=false;
                alreadyInTaskQueue=false;
                temporaryDestination=temporary;
                this.server=server;
                isTopic=dest instanceof SpyTopic;
                listeners=0;
                
                spyMessageLog = new SpyMessageLog( 
dest.getName()+"-transaction-log.dat" );
                SpyMessage[] rebuild = spyMessageLog.rebuildMessagesFromLog();
                for( int i=0; i < rebuild.length; i++ ) {
                        restoreMessage( rebuild[i] );
                        messageIdCounter = Math.max( messageIdCounter, 
rebuild[i].messageId+1 );
                }
                Log.notice("Restored "+rebuild.length+" messages to "+dest.getName()+" 
from the transaction log");
                
        }
  
  
        
        // Package protected ---------------------------------------------          
        void addSubscriber(SpyDistributedConnection dc) throws JMSException
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
                        if 
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new 
JMSException("You cannot subscriber to this temporary destination");
  
                        Object qr = subscribers.get(dc.getClientID());
                        if( qr == null ) {
                                subscribers.put(dc.getClientID(),new 
JMSServerQueueReceiver(this,dc));
                        }
                }
        }
  
  
        
        void removeSubscriber(SpyDistributedConnection dc,Iterator i)
        {
                //We want to avoid removeSubscriber, addSubscriber or sendOneMessage 
to work concurently
                synchronized (destination) {
                        
                        JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
                        if (qr==null) return;
  
                        qr.close();
                                                
                        if (i==null) {
                                if (subscribers.remove(dc.getClientID())==null) 
Log.notice("WARNING: Could not remove "+dc.getClientID());
                        } else {
                                i.remove();
                        }
  
                }
        }
  
  
        
        public void addMessage(SpyMessage mes) throws JMSException
        {
                //Add a message to the message list... 
                synchronized (messages) 
                {                       
                        //Add the message to the queue
                        //messages is now an ordered tree. The order depends 
                        //first on the priority and then on messageId
                        mes.messageId = messageIdCounter++;
  
                        if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
                                spyMessageLog.logAddMessage(mes);
                                spyMessageLog.commit();
                        }
                        
                        messages.add(mes);
                        
                        if (isTopic) {
                                //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
                                if (!threadWorking) notifyWorkers();
                        } else {
                                if (listeners!=0&&!threadWorking) notifyWorkers();
                        }
  
                }
        }
  
  
        
        //Clear the message queue
        synchronized SpyMessage[] startWork()
        {
                if (threadWorking) throw new RuntimeException("One thread is already 
working !");
                
                synchronized (messages) {
                        SpyMessage[] mes=new SpyMessage[messages.size()];
                        mes=(SpyMessage[])messages.toArray(mes);
                        
                        //<DEBUG>
                        messages.clear();
                        //</DEBUG>
                        
                        threadWorking=true;
                        alreadyInTaskQueue=false;
                        return mes;
                }
        }
  
        
                
        synchronized SpyMessage startWorkQueue()
        {
                synchronized (messages) {
                        
                        threadWorking=true;
                        alreadyInTaskQueue=false;
  
                        if (messages.size()==0) return null;
                        SpyMessage m = (SpyMessage)messages.first();
                        messages.remove(m);
                        return m;
                }
        }
  
        
                
        void endWork()
        {
                //The thread has finished his work...
                threadWorking=false;            
                
                synchronized (messages) {
                        if (isTopic) {
                                //notify another thread if there is work to do !
                                if (!messages.isEmpty()) notifyWorkers();
                        } else {
                                if (listeners!=0&&!messages.isEmpty()) notifyWorkers();
                        }
                }
        }
  
  
        
        void sendOneMessage(SpyMessage mes)
        {
                //we can only add/remove a subscribers once the message is sent ( 
iterator is fail-fast )
                synchronized (subscribers) {
                        if (subscribers.isEmpty()) return;
                                
                        Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
                                JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)i.next();
                                try {
                                        qr.sendOneMessage(mes);
                                } catch (Exception e) {
                                        Log.notice("Cannot deliver this message to the 
client: "+qr.dc.getClientID());                                  
                                        Log.notice(e);
                                        handleConnectionFailure(qr.dc,i);
                                } 
                        }       
                }
        }
  
  
        
        void sendMultipleMessages(SpyMessage mes[])
        {
                synchronized (subscribers) {
                        if (subscribers.isEmpty()) return;
                                
                        Iterator i=subscribers.values().iterator();
                                
                        while (i.hasNext()) {
                                JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)i.next();
                                try {
                                        qr.sendMultipleMessages(mes);
                                } catch (Exception e) {
                                        Log.error("Cannot deliver those messages to 
the client "+qr.dc.getClientID());                                  
                                        Log.error(e);
                                        handleConnectionFailure(qr.dc,i);
                                } 
                        }       
                }
        }
        
        //A connection is closing
        void connectionClosing(SpyDistributedConnection dc)
        {
                if (!subscribers.containsKey(dc.getClientID())) return;
                Log.notice("Warning: The DistributedConnection was still registered 
for "+destination);
                removeSubscriber(dc,null);
        }
  
        
                
        void notifyWorkers()
        {
                //It is useless to put many times the same destination in the task 
queue
                if (alreadyInTaskQueue) return;
                
                synchronized (server.taskQueue) {
                        alreadyInTaskQueue=true;
                        server.taskQueue.addLast(this);
                        server.taskQueue.notify();
                }
        }
  
        
                
        private void handleConnectionFailure(SpyDistributedConnection dc,Iterator i)
        {
                //We should try again :) This behavior should under control of a 
Failure-Plugin         
                Log.error("I remove the Connection "+dc.getClientID()+" from the 
subscribers list");
                
                //Call JMSServer.ConnectionClosing(), but ask him not to check my list.
                server.connectionClosing(dc,this);
                
                //remove this connection from the list
                removeSubscriber(dc,i);
        }
                
  
  
                
        void doMyJob() throws JMSException 
        {                       
                if (isTopic) {                  
                
                        //Clear the message queue
                        SpyMessage[] msgs=startWork();
  
                        boolean msgRemoved=false;
                        for( int i=0; i < msgs.length; i++ ) {
                                if( msgs[i].getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) {
                                        spyMessageLog.logRemoveMessage(msgs[i]);
                                        msgRemoved=true;
                                }
                        }
                        if( msgRemoved )
                                spyMessageLog.commit();
        
                        //Let the thread do its work
                        if (msgs.length == 1) {
                                
                                if (!msgs[0].isOutdated()) {
                                        sendOneMessage(msgs[0]);
                                }
                                        
                        } else if (msgs.length>1) {
                                //We can send multiple messages
                                sendMultipleMessages(msgs);
                        }
                                
                        //Notify that it has finished its work : another thread can 
start working on this queue
                        endWork();
                        
                } else {
  
                        Log.log("Dispatching messages");
                        
                        synchronized (this) {
                                //In the Queue case, we synchronize on [this] to avoid 
changes (listening modifications)
                                //while we are dispatching messages
                                
                                while (true) {
                                                                        
                                        //At first, find a receiver
                                        //NL: We could find a better receiver (load 
balancing ?)
                                        //HC: Using Round Robin should provide some 
load balancing
                                                                        
                                        //Get the message ( if there is one message 
pending )
                                        SpyMessage mes=startWorkQueue();
                                        if (mes==null) {
                                                Log.log("Done dispatching messages: No 
more message to send");
                                                break;
                                        }
                                        
                                        if (mes.isOutdated()) {
                                                if( mes.getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) {
                                                        
spyMessageLog.logRemoveMessage(mes);
                                                        spyMessageLog.commit();
                                                }
                                                continue;
                                        }
                                        
                                        // we may have to restore the 
lastUsedQueueReceiver
                                        // if message on the queue is not sent. (we 
don't want to skip 
                                        // destination in the round robin)
                                        JMSServerQueueReceiver qr;
                                        if( useRoundRobinMessageDistribution ) {
                                                qr=pickNextRoundRobinQueueReceiver();
                                        } else {
                                                qr=pickFirstFoundQueueReceiver();
                                        }
                                        if ( qr == null ) {
                                                restoreMessage(mes);
                                                Log.log("Done dispatching messages: No 
receiver available for dispatch");
                                                break;
                                        }
                                                                                       
                                                                 
                                        //Send the message
                                        try {
                                                qr.sendOneMessage(mes);
                                        } catch (NoReceiverException e) {
                                                
                                                //Log.log(e);
                                                Log.log("Got a NoReceiverException 
from: "+qr.dc.getClientID());
                                                restoreMessage(mes);    
                                                qr.setListening(false);
  
                                        } catch (JMSException e) {
                                                  throw e;
                                        } catch (Exception e) {
                                                //This is a transport failure. We 
should define our own Transport Failure class
                                                //for a better execption catching
                                                
                                                restoreMessage(mes);                   
                         
                                                Log.error("Cannot deliver this message 
to the client "+qr.dc.getClientID());
                                                Log.error(e);
                                                handleConnectionFailure(qr.dc,null);
                                        } 
                                                                        
                                }
                                
                                //Notify that it has finished its work : another 
thread can start working on this queue
                                endWork();
                                                        
                        }
                }
        }
        
  
                
        void connectionListening(boolean mode,SpyDistributedConnection dc) throws 
JMSException 
        {
        
                // Before check                 
                // Synchronized code : We want to avoid sending messages while we are 
changing the connection status 
                synchronized (this) {
                        
                        
                        JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
                        if (qr==null) throw new JMSException("This 
DistributedConnection is not registered");
  
                        qr.setListening(mode);
                
                        if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
                                synchronized (messages) {
                                        if (!messages.isEmpty()) notifyWorkers();
                                }
                        }
                
                }               
        }       
  
  
        
        public void acknowledge(SpyDistributedConnection dc, String messageId, boolean 
isAck) throws JMSException       {
        
                JMSServerQueueReceiver qr = 
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
                if( qr==null )
                        throw new JMSException("You have not subscribed to this 
destination.");
                        
                qr.acknowledge(messageId, isAck);
                
        }       
  
        
                
        /**
         * Get a SpyDistributedConnection object that is listening 
         * to this queue.  Picks the first one it can find.
         */
        private JMSServerQueueReceiver pickFirstFoundQueueReceiver() {
  
                // No valid next connection will exist, return null
                if (listeners == 0)
                        return null;
  
                Iterator i = subscribers.values().iterator();
                while (i.hasNext()) {
                        JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
  
                        // Test to see if the connection is valid pick
                        if (t.isListening()) {
                                return t;
                        }
                }
  
                // We got here because we did not find a valid item in the list.
                Log.error("FIXME: The listeners count was invalid !");
                return null;
        }       
  
                        
        /**
         * Get a JMSServerQueueReceiver object that is listening 
         * to this queue.  If multiple objects are listening to the queue
         * this multiple calls to this method will cycle through them in a round 
         * robin fasion.
         */
        private JMSServerQueueReceiver pickNextRoundRobinQueueReceiver() {
  
                // No valid next connection will exist, return null
                if (listeners == 0)
                        return null;
  
                Iterator i = subscribers.values().iterator();
                JMSServerQueueReceiver firstFoundConnection = null;
                boolean enableSelectNext = false;
  
                while (i.hasNext()) {
                        JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
  
                        // Select the next valid connection if we are past the last 
used connection
                        if (t == lastUsedQueueReceiver || lastUsedQueueReceiver == 
null)
                                enableSelectNext = true;
  
                        // Test to see if the connection is valid pick
                        if (t.isListening()) {
                                // Store the first valid connection since the last 
used might be the last
                                // in the list 
                                if (firstFoundConnection == null)
                                        firstFoundConnection = t;
  
                                // Are we past the last used? then we have the next 
item in the round robin  
                                if (enableSelectNext && t != lastUsedQueueReceiver) {
                                        lastUsedQueueReceiver = t;
                                        return t;
                                }
                        }
                }
  
                // We got here because we did not find a valid item in the list after 
the last
                // used item, so lest use the first valid item 
                if (firstFoundConnection != null) {
                        lastUsedQueueReceiver = firstFoundConnection;
                        return firstFoundConnection;
                } else {
                        Log.error("FIXME: The listeners count was invalid !");
                        return null;
                }
        }       
  
        
        
                
        //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
        SpyMessage queueReceive(long wait, SpyDistributedConnection dc) throws  
JMSException
        {
                
                // Synchronized code : We want to avoid sending messages while we are 
changing the connection status 
                synchronized (this) {
                        JMSServerQueueReceiver 
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
                        if (qr==null) throw new JMSException("This 
DistributedConnection is not registered");
  
                        if( wait < 0 ) {
                                synchronized (messages) {
                                        if (messages.size()==0) return null;
                                        SpyMessage m = (SpyMessage)messages.first();
                                        messages.remove(m);
                                        return m;
                                }
                        } else {
                                
                                qr.addReceiver(wait);
                                
                                if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) 
{
                                        synchronized (messages) {
                                                if (!messages.isEmpty()) 
notifyWorkers();
                                        }
                                }
                        
                                return null;
                        }
                }
        }       
                
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes) 
        {
                //restore a message to the message list...
                synchronized (messages) {
                        messages.add(mes);      
                        
                        if (isTopic) {
                                //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
                                if (!threadWorking) notifyWorkers();
                        } else {
                                if (listeners!=0&&!threadWorking) notifyWorkers();
                        }
  
                }
        }       
        
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/JMSServerMBean.java
  
  Index: JMSServerMBean.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  import org.spydermq.*;
  
  /**
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public interface JMSServerMBean
  {
        public SpyTopic newTopic(String name) throws JMSException;
        public SpyQueue newQueue(String name) throws JMSException;
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  import javax.jms.Destination;
  import javax.jms.TemporaryTopic;
  import javax.jms.TemporaryQueue;
  import javax.jms.Topic;
  import javax.jms.Queue;
  
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.Iterator;
  
  import org.spydermq.*;
  import org.spydermq.security.SecurityManager;
  
  /**
   *    This class implements the JMS provider
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class JMSServer 
                implements Runnable, JMSServerMBean
  {
        
        // Constants -----------------------------------------------------
        
        //number of threads in the pool (TO DO: this value should be dynamic)
        final int NB_THREADS=1;
        public static final String OBJECT_NAME = "JMS:service=JMSServer";
  
        // Attributes ----------------------------------------------------
     
        //messages pending for a Destination ( HashMap of JMSServerQueue objects )
        public HashMap messageQueue;
        //list of tasks pending ( linked list of JMSServerQueue objects )
        LinkedList taskQueue; //look when we unregister a temporaryTopic/Queue
        //last id given to a client 
        private int lastID;     
        //last id given to a temporary topic
        private int lastTemporaryTopic; 
        //last id given to a temporary queue
        private int lastTemporaryQueue; 
        //The security manager
        SecurityManager securityManager;
  
        /**
         * <code>true</code> when the server is running.  <code>false</code> when the 
         * server should stop running.
         */
        private boolean alive = true;
  
        /**
         * Because there can be a delay between killing the JMS service and the 
         * service actually dying, this field is used to tell external classes
         * that that server has actually stopped.
         */
        private boolean stopped = true;
  
        // Constructor ---------------------------------------------------
     
        public JMSServer(SecurityManager securityManager)
        {
  
                taskQueue=new LinkedList();
                messageQueue=new HashMap();
                
                for(int i=0;i<NB_THREADS;i++) 
                {
                        Thread oneThread=new Thread(this);
                        oneThread.setDaemon(true);
                        oneThread.setName(new Integer(i).toString());
                        oneThread.start();
                }
                
                lastID=1;
                lastTemporaryTopic=1;
                this.securityManager=securityManager;
                
        }
  
        /**
         * Returns <code>false</code> if the JMS server is currently
         * running and handling requests, <code>true</code> otherwise.
         *
         * @return <code>false</code> if the JMS server is currently
         *         running and handling requests, <code>true</code> 
         *         otherwise.
         */
        public boolean isStopped() {
                return this.stopped;
        }
  
        // Public --------------------------------------------------------
  
        //This is a correct threading system, but this is not ideal... 
        //We should let threads cycle through the JMSServerQueue list, and 
synchronized on the queue they are working on.
        
        public void run() {
                        while (alive) {
                                JMSServerQueue queue = null;                    
                
                                this.stopped = false;
  
                                //Wait (and sleep) until it can find something to do
                                synchronized (taskQueue) {
                                        while (queue == null && alive) {               
                         
                                                
                                                // size() is O(1) in LinkedList... 
                                                int size=taskQueue.size(); 
                                                if (size!=0) { 
                                                        
                                                        //<DEBUG>
                                                        queue = 
(JMSServerQueue)taskQueue.removeFirst();
                                                        
//queue=(JMSServerQueue)taskQueue.getFirst();
                                                        //</DEBUG>
                                                        
                                                        //One other thread can start 
working on the task queue...
                                                        if (size > 1) {
                                                                taskQueue.notify();
                                                        }
                                                } else {        
                                                        try {
                                                                //Log.log("I'm going 
to bed...");
                                                                taskQueue.wait(5000);
                                                                //Log.log("I wake up");
                                                        } catch (InterruptedException 
e) {
                                                        }
                                                }
                                                
                                        }
                                }
                                
                                if (alive) {
                                        //Ask the queue to do its job
                                        try {
                                                queue.doMyJob();
                                        } catch (JMSException e) {
                                                Log.error(e);
                                        }
                                }
                        }
                        Log.log("JMS service stopped.");
                        this.stopped = true;
        }
  
        public void stopServer() {
                this.alive = false;
        }
  
        // Administration calls 
        public SpyTopic newTopic(String name) throws JMSException
        {
                Log.notice("[JMSServer] new topic : "+name);
        
                SpyTopic newTopic=new SpyTopic(name);
                if (messageQueue.containsKey(newTopic)) throw new JMSException("This 
topic already exists !");
                
                JMSServerQueue queue=new JMSServerQueue(newTopic,null,this);
                
                //Add this new JMSServerQueue to the list
                synchronized (messageQueue) {
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(newTopic,queue);     
                        messageQueue=newMap;
                }
        
                return newTopic;
        }
  
                
        public SpyQueue newQueue(String name) throws JMSException
        {
                Log.notice("[JMSServer] new queue : "+name);
        
                SpyQueue newQueue=new SpyQueue(name);
                if (messageQueue.containsKey(newQueue)) throw new JMSException("This 
queue already exists !");
                
                JMSServerQueue queue=new JMSServerQueue(newQueue,null,this);
                
                //Add this new JMSServerQueue to the list
                synchronized (messageQueue) {
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(newQueue,queue);     
                        messageQueue=newMap;
                }
        
                return newQueue;
        }
  
        // -----------------------------------------
        // Callbacks for the invocation layer ------
        // -----------------------------------------
        
        //Get a new ClientID for a connection
        public String getID()
        {
                String ID=null;
                
                while (true) {
                        try {
                                ID="ID"+(new Integer(lastID++).toString());
                                securityManager.addClientID(ID);
                                break;
                        } catch (Exception e) {
                        }
                }
                
                return ID;
        }
        
        public synchronized SpyTopic createTopic(String name) throws JMSException
        {
                Log.log("createTopic("+name+")");
  
                SpyTopic newTopic=new SpyTopic(name);
                if (!messageQueue.containsKey(newTopic)) throw new JMSException("This 
destination does not exist !");
                return newTopic;                
        }
  
        public synchronized SpyQueue createQueue(String name) throws JMSException
        {
                Log.log("createQueue("+name+")");
  
                SpyQueue newQueue=new SpyQueue(name);
                if (!messageQueue.containsKey(newQueue)) throw new JMSException("This 
destination does not exist !");
                return newQueue;
        }
  
        public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection 
dc) throws JMSException
        {
                SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new 
Integer(lastTemporaryTopic++).toString()),dc);
  
                synchronized (messageQueue) {
                        JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(topic,queue); 
                        messageQueue=newMap;
                }
                
                return topic;
        }
        
        public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection 
dc) throws JMSException
        {
                SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new 
Integer(lastTemporaryQueue++).toString()),dc);
  
                synchronized (messageQueue) {
                        JMSServerQueue sessionQueue=new 
JMSServerQueue(newQueue,dc,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
                        newMap.put(newQueue,sessionQueue); 
                        messageQueue=newMap;
                }
                
                return newQueue;
        }
  
        //A connection is closing [error or notification]
        public synchronized void connectionClosing(SpyDistributedConnection 
dc,JMSServerQueue noCheck)
        {
                Log.log("connectionClosing(dc="+dc+",noCheck="+noCheck+")");
                
                if (dc==null) return;
                
                //unregister its clientID
                if (dc.getClientID()!=null)
                        securityManager.removeID(dc.getClientID());
                
                //Remove the connection from the subscribers list
                synchronized (messageQueue) {
  
                        HashMap newMap=null;
                        Iterator i=messageQueue.values().iterator();
                        boolean modified=false; //don't waste memory
        
                        while (i.hasNext()) {
                                
                                JMSServerQueue sq=(JMSServerQueue)i.next();
                                                                
                                if (dc.equals(sq.temporaryDestination)) {
                                        if (!modified) 
newMap=(HashMap)messageQueue.clone();
                                        newMap.remove(sq.destination);
                                        modified=true;
                                } else {
                                        if (sq==noCheck) continue;
                                        sq.connectionClosing(dc);
                                }
                                
                        }
                
                        if (modified) messageQueue=newMap;
                }
                
        }
        
        public synchronized void deleteTemporaryDestination(SpyDestination dest)
        {
                Log.log("JMSServer: deleteDestination(dest="+dest.toString()+")");
  
                synchronized (messageQueue) {
                        HashMap newMap=(HashMap)messageQueue.clone();   
                        newMap.remove(dest);
                        messageQueue=newMap;
                }
                
        }
                        
        public void checkID(String ID) throws JMSException
        {
                securityManager.addClientID(ID);
        }
        
        public void finalize()
        {
                Log.error("JMSServer.finalize()");
        }       
  
        //Sent by a client to Ack or Nack a message.
        public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws JMSException
        {               
                JMSServerQueue 
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
                if (serverQueue==null) throw new JMSException("Destination does not 
exist: "+item.jmsDestination);
        
                serverQueue.acknowledge(dc, item.jmsMessageID, item.isAck);
        }
        
        
        //A connection has sent a new message
        public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws 
JMSException 
        {
        
                Log.notice("INCOMING: "+dc.getClientID()+" => "+val.jmsDestination);   
 
                JMSServerQueue 
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
                if (queue==null) throw new JMSException("This destination does not 
exist !");   
                //Add the message to the queue
                queue.addMessage(val);
                
        }       
        
        public void connectionListening(SpyDistributedConnection dc,boolean 
mode,Destination dest) throws JMSException
        {
                JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(dest);
                if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
                
                serverQueue.connectionListening(mode,dc);
        }
        
        public org.spydermq.security.SecurityManager getSecurityManager() {
                return securityManager;
        }       
  
        //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
        public SpyMessage queueReceive(SpyDistributedConnection dc,Queue queue, long 
wait) throws JMSException
        {
                Log.log("JMSserver: queueReceive(queue="+queue+",wait="+wait+")");
                JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
                if (serverQueue==null) throw new JMSException("This destination does 
not exist !");
                
                return serverQueue.queueReceive(wait,dc);
                
        }       
        
        //A connection object wants to subscribe to a Destination
        public void subscribe(SpyDistributedConnection dc,Destination dest) throws 
JMSException
        {
                Log.log("Server: 
subscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
                
                JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
                if (queue==null) throw new JMSException("This destination does not 
exist !");
                queue.addSubscriber(dc);
        }               
        
        /**
         * TODO: The following function has to be performed as a Unit Of Work.
         *
         * for now we just do a quick hack to make it work most of the time.
         */
        public void transact(SpyDistributedConnection dc, Transaction t) throws 
JMSException {
  
                if( t.messages != null ) {
                        for( int i=0; i < t.messages.length; i++ ) {
                                addMessage(dc, t.messages[i]);
                        }
                }
  
                if( t.acks != null ) {
                        for( int i=0; i < t.acks.length; i++ ) {
                                acknowledge(dc, t.acks[i]);
                        }
                }
                
  
        }       
        
        public void unsubscribe(SpyDistributedConnection dc,Destination dest) throws 
JMSException
        {
                Log.log("Server: 
unsubscribe(dest="+dest.toString()+",idConnection="+dc.getClientID()+")");
                                
                JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
                if (queue==null) throw new JMSException("This destination does not 
exist !");
                queue.removeSubscriber(dc,null);
        }
  
  }
  
  
  

Reply via email to