User: chirino 
  Date: 01/07/27 17:33:38

  Added:       src/main/org/jbossmq/server JMSQueue.java JMSTopic.java
                        QueuedTask.java StartServer.java
  Log:
  Once again many changes.
  - The logic that handled the processing of queue and topic messages
   was seperated our more to make it easier to follow.
  - A QueuedTask class was created to avoid unneeded processing of queues.
  - The interface between the client-server-queues-peristence manager to handel
   DurableSubscription was too verbose, created a DurableSubscripton class and now
   SpyTopics can be inspected to see if they are being used as a DurableSubscription
  - The MBeans that add queues and topics makes it simpler to configure a queue/topic.
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jbossmq/server/JMSQueue.java
  
  Index: JMSQueue.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  
  
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  import org.jbossmq.*;
  
  
  /**
   *    This class is a message queue which is stored (hashed by Destination) on the
   *    JMS provider
   *
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   *
   *    @version $Revision: 1.1 $
   */
  public class JMSQueue extends JMSDestination {           
  
  
  
  
  
  
  
  
  
  
        // Constructor ---------------------------------------------------
        JMSQueue(SpyDestination dest,ClientConsumer temporary,JMSServer server) throws 
JMSException
        {
                super( dest, temporary, server );
  
                exclusiveQueue = new ExclusiveQueue(server);
                // If this is a non-temp queue, then we should persist data
                if( temporaryDestination == null ) {
                        server.getPersistenceManager().initQueue(dest);
                }
  
        }
  
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
  
                cat.debug(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
  
                if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT &&
                        temporaryDestination!=null ) {
                        throw new JMSException("Cannot write a persistent message to a 
temporary destination!");
                }
  
                //Number the message so that we can preserve order of delivery.
                synchronized(this) {
                  mes.messageId = messageIdCounter++;
                }
  
                if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
                        server.getPersistenceManager().add(mes, txId);
  
                exclusiveQueue.addMessage(mes, txId);
        }
  
  
  
  
  
  
        public SpyMessage[] browse(String selector) throws JMSException {
                return exclusiveQueue.browse( selector );
        }
  
  
  
  
  
  
  
  
  
  
  
        public String toString() {
                return "JMSDestination:"+destination;
        }
  
  
  
  
  
  
  
  
  
        //list of messages
        ExclusiveQueue exclusiveQueue;
  
        // Package protected ---------------------------------------------
        JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws 
JMSException {
                cat.debug("Adding consumer: "+c+")");
                exclusiveQueue.addConsumer(c);
                return this;
        }
  
        public void destroy() throws JMSException {
                server.getPersistenceManager().destroyQueue(destination);
        }
  
        /**
         * notifyMessageAvailable method comment.
         */
        public void notifyMessageAvailable() {
                exclusiveQueue.notifyMessageAvailable();
        }
  
        /**
         * receiveNoWait method comment.
         */
        public org.jbossmq.SpyMessage receiveNoWait(org.jbossmq.Subscription sub) 
throws javax.jms.JMSException {
                return exclusiveQueue.receiveNoWait(sub);
        }
  
        // Package protected ---------------------------------------------
        void removeConsumer(Subscription sub,ClientConsumer c) throws JMSException {
  
                cat.debug("Removing consumer: "+c+")");
                exclusiveQueue.removeConsumer(c);
                
        }
  
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes)
        {
                cat.debug(""+this+"->restoreMessage(mes="+mes+")");
                synchronized(this) {
                  messageIdCounter = Math.max(messageIdCounter, mes.messageId+1);
                }
                exclusiveQueue.restoreMessage(mes);
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/server/JMSTopic.java
  
  Index: JMSTopic.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.server;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.DeliveryMode;
  
  
  
  
  import java.util.Iterator;
  import java.util.Hashtable;
  import java.util.LinkedList;
  import java.util.HashMap;
  import java.util.TreeSet;
  
  import org.jbossmq.*;
  
  
  /**
   *    This class is a message queue which is stored (hashed by Destination) on the
   *    JMS provider
   *
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   *
   *    @version $Revision: 1.1 $
   */
  public class JMSTopic extends JMSDestination {
           
  
  
  
  
  
  
  
        //Hashmap of ExclusiveQueues
        HashMap exclusiveQueues = new HashMap();
        //ShareQueue used for topics
        SharedQueue sharedQueue;
  
        // Constructor ---------------------------------------------------
        JMSTopic(SpyDestination dest,ClientConsumer temporary,JMSServer server) throws 
JMSException
        {
                super( dest, temporary, server );
                sharedQueue = new SharedQueue(server);
        }
  
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
  
                cat.debug(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
  
                if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT &&
                        temporaryDestination!=null ) {
                        throw new JMSException("Cannot write a persistent message to a 
temporary destination!");
                }
  
  
                //Number the message so that we can preserve order of delivery.
                synchronized(this) {
                  mes.messageId = messageIdCounter++;
                }
  
                sharedQueue.addMessage(mes, txId);
  
                synchronized (exclusiveQueues) {
                        if( exclusiveQueues.size() == 0 )
                                return;
  
                        Iterator iter = exclusiveQueues.values().iterator();
                        while( iter.hasNext() ) {                               
                                JMSQueue eq = (JMSQueue)iter.next();
                                SpyMessage clone = mes.myClone();
                                clone.setJMSDestination( eq.destination );
                                eq.addMessage(clone, txId);
                        }
                }
        }
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
        // Package protected ---------------------------------------------
        JMSDestination addConsumer(Subscription sub,ClientConsumer c) throws 
JMSException {
  
                SpyTopic topic = (SpyTopic)sub.destination;
                DurableSubcriptionID id = topic.getDurableSubscriptionID();
                if( id!=null ) {
  
                        server.getStateManager().setDurableSubscription(server, id, 
(SpyTopic)sub.destination);
                        JMSQueue queue = getDurableSubscription(id);
                        if( queue == null ) {
                                
server.getStateManager().setDurableSubscription(server, id, topic);
                                queue = getDurableSubscription(id);
  
                                if( queue == null ) {
                                        throw new JMSException("Could not create a the 
durable subscription.");
                                }
                        }
                                
                        return queue.addConsumer(sub, c);
                                                
                } else {                
                        cat.debug("Adding consumer: "+c);
                        sharedQueue.addConsumer(c);
                        return this;
                }
        }
  
        public void createDurableSubscription(DurableSubcriptionID sub) throws 
JMSException     {
                if( temporaryDestination != null )
                        throw new JMSException("Not a valid operation on a temporary 
topic");
  
                SpyTopic dstopic = new SpyTopic( (SpyTopic)destination, sub);
                
                synchronized (exclusiveQueues) {
                        exclusiveQueues.put(sub, new JMSQueue(dstopic, null, server));
                }
        }
  
        public void destoryDurableSubscription(DurableSubcriptionID id) throws 
JMSException
        {
                synchronized (exclusiveQueues) {
                        ((JMSQueue)exclusiveQueues.remove(id)).destroy();
                }
        }
  
        // Package protected ---------------------------------------------
        JMSQueue getDurableSubscription(DurableSubcriptionID id) {
                synchronized (exclusiveQueues) {
                        return (JMSQueue)exclusiveQueues.get( id );
                }
  
        }
  
        /**
         * notifyMessageAvailable method comment.
         */
        public void notifyMessageAvailable() {
                sharedQueue.notifyMessageAvailable();
        }
  
        /**
         * receiveNoWait method comment.
         */
        public org.jbossmq.SpyMessage receiveNoWait(org.jbossmq.Subscription sub) 
throws javax.jms.JMSException {
                throw new JMSException("Internal Error");
        }
  
        // Package protected ---------------------------------------------
        void removeConsumer(Subscription sub,ClientConsumer c) throws JMSException {
                cat.debug("Removing consumer: "+c);
                sharedQueue.removeConsumer(c);
        }
  
        //Used to put a message that was added previously to the queue, back in the 
queue
        public void restoreMessage(SpyMessage mes)
        {
                cat.debug("Restoring Message: "+mes);
                synchronized(this) {
                  messageIdCounter = Math.max(messageIdCounter, mes.messageId+1);
                }
                
                SpyTopic topic = (SpyTopic)mes.getJMSDestination();
                JMSQueue eq = getDurableSubscription(topic.getDurableSubscriptionID());
                eq.restoreMessage(mes);
                
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/server/QueuedTask.java
  
  Index: QueuedTask.java
  ===================================================================
  package org.jbossmq.server;
  
  import EDU.oswego.cs.dl.util.concurrent.Executor;
  
  /**
   * This class allows us to reduce the number of times
   * we add a task to a thread pool.  Most tasks in the
   * server are used to process an object due to some
   * modification of it's state.  If multiple threads modify
   * the state of the object conncurently, The task only
   * needs to run once to process the multiple modifications.
   * 
   * @author: Administrator
   */
  public class QueuedTask implements Runnable {
        Runnable task;
        boolean  isQueued=false;
        static org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(QueuedTask.class);
  
        public QueuedTask( Runnable task ) {
                this.task = task;
        }
        synchronized public void executeWith(Executor e) throws InterruptedException {
                if( !isQueued ) {
                        cat.debug("Task was not queued in the executor previously, 
adding to executor");
                        isQueued = true;
                        e.execute(this);
                } else {
                        cat.debug("Task was queued previously.");
                }
        }
        /**
         * When an object implementing interface <code>Runnable</code> is used 
         * to create a thread, starting the thread causes the object's 
         * <code>run</code> method to be called in that separately executing 
         * thread. 
         * <p>
         * The general contract of the method <code>run</code> is that it may 
         * take any action whatsoever.
         *
         * @see     java.lang.Thread#run()
         */
        public void run() {
                synchronized( this ) {
                        isQueued = false;
                }       
                task.run();
        }
  }
  
  
  
  1.10      +13 -15    jbossmq/src/main/org/jbossmq/server/StartServer.java
  
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to