User: pkendall
  Date: 01/05/15 00:16:48

  Modified:    src/main/org/jbossmq/server StartServer.java
                        PersistenceManager.java JMSDestination.java
  Log:
  Modifications for abstracting out persistence package.
  
  Revision  Changes    Path
  1.3       +24 -14    jbossmq/src/main/org/jbossmq/server/StartServer.java
  
  Index: StartServer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/StartServer.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- StartServer.java  2001/03/02 01:13:02     1.2
  +++ StartServer.java  2001/05/15 07:16:48     1.3
  @@ -40,7 +40,7 @@
   
   /**
    *   Class used to start a JMS service.  This can be called from inside another
  - 
  +
    *   application to start the JMS provider.
    *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  @@ -48,7 +48,7 @@
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class StartServer implements Runnable
   {
  @@ -143,10 +143,10 @@
        public void run() {
   
                try {
  -             
  +
                        //Load the property file
                        InputStream in = 
getClass().getClassLoader().getResource("jbossmq.xml").openStream();
  -                     XElement serverCfg = XElement.createFrom(in);                  
 
  +                     XElement serverCfg = XElement.createFrom(in);
                        in.close();
   
                        // Make sure that we loaded the right type of xml file
  @@ -166,15 +166,25 @@
                        UserManager userManager=new UserManager(theServer, 
serverCfg.getElement("UserManager"));
                        theServer.userManager = userManager;
   
  -                     //Creatye a PersistenceManager object
  -                     PersistenceManager persistenceManager = new 
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
  +                     //Create a PersistenceManager object
  +                     PersistenceManager persistenceManager;
  +                     XElement pmcfg = serverCfg.getElement("PersistenceManager");
  +                     if( pmcfg.getAttribute("class") == null ) {
  +                             persistenceManager = new 
org.jbossmq.persistence.PersistenceManager(theServer, pmcfg);
  +                     }
  +                     else {
  +                       Class pmc = getClass().forName(pmcfg.getAttribute("class"));
  +                             Class[] types = {theServer.getClass(), 
pmcfg.getClass()};
  +                             Object[] args = {theServer, pmcfg};
  +                             persistenceManager = 
(PersistenceManager)pmc.getConstructor(types).newInstance(args);
  +                     }
                        theServer.persistenceManager = persistenceManager;
  -                     
  +
                        registerService(theServer, new 
ObjectName(JMSServer.OBJECT_NAME));
   
                        //create the known topics
                        Context subcontext=ctx.createSubcontext("topic");
  -                     
  +
                        Enumeration enum = serverCfg.getElementsNamed("Topic");
                        while( enum.hasMoreElements() ) {
                                XElement element = (XElement)enum.nextElement();
  @@ -183,11 +193,11 @@
                                Topic t=theServer.newTopic(name);
                                subcontext.rebind(name,t);
                        }
  -                     
   
  +
                        //create the known queues
                        subcontext=ctx.createSubcontext("queue");
  -                     
  +
                        enum = serverCfg.getElementsNamed("Queue");
                        while( enum.hasMoreElements() ) {
                                XElement element = (XElement)enum.nextElement();
  @@ -205,7 +215,7 @@
   
                        enum = serverCfg.getElementsNamed("InvocationLayer");
                        while( enum.hasMoreElements()) {
  -                             
  +
                                XElement element = (XElement)enum.nextElement();
                                String name = element.getField("Name");
                                String topicConnectionFactoryJNDI = 
element.getField("TopicConnectionFactoryJNDI");
  @@ -228,12 +238,12 @@
   
                                //(re)bind the connection factories in the JNDI 
namespace
                                
ctx.rebind(topicConnectionFactoryJNDI,invocationLayerFactory.spyTopicConnectionFactory);
  -                             
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
                                
  +                             
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
                                
ctx.rebind(xaTopicConnectionFactoryJNDI,invocationLayerFactory.spyXATopicConnectionFactory);
                                
ctx.rebind(xaQueueConnectionFactoryJNDI,invocationLayerFactory.spyXAQueueConnectionFactory);
  -                             
  +
                        }
  -                     
  +
                        System.out.println("Server Version 0.8 Started");
   
   
  
  
  
  1.4       +136 -234  jbossmq/src/main/org/jbossmq/server/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/PersistenceManager.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- PersistenceManager.java   2001/05/13 08:22:00     1.3
  +++ PersistenceManager.java   2001/05/15 07:16:48     1.4
  @@ -8,44 +8,31 @@
   
   import javax.jms.JMSException;
   
  -import java.net.URL;
   import java.util.HashMap;
  -import java.util.TreeSet;
   import java.util.Iterator;
   import java.util.LinkedList;
   
   import org.jbossmq.xml.XElement;
   import org.jbossmq.SpyMessage;
  -import org.jbossmq.persistence.SpyTxLog;
  -import org.jbossmq.persistence.SpyMessageLog;
   import org.jbossmq.SpyDestination;
   import org.jbossmq.SpyDistributedConnection;
   
   /**
  - *   This class manages all persistence related services.
  + *   This class allows provides the base for user supplied persistence packages.
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.3 $
  + *  @author Paul Kendall ([EMAIL PROTECTED])
  + *
  + *   @version $Revision: 1.4 $
    */
  -public class PersistenceManager {
  +public abstract class PersistenceManager {
   
  -     // The server this persistence manager is providing service for
  -     JMSServer server;
  -     // The configuration data for the manager.
  -     XElement configElement;
  -     // The directory where persistence data should be stored
  -     URL dataDirectory;
  -     // Log file used to store commited transactions.
  -     SpyTxLog spyTxLog;
  -     // Maps SpyDestinations to SpyMessageLogs
  -     HashMap messageLogs = new HashMap();
        // Maps (Long)txIds to LinkedList of Runnable tasks
        HashMap postCommitTasks = new HashMap();
  -     // Maps Global transactions to local transactions
  -     HashMap globalToLocal = new HashMap();
        // Maps (Long)txIds to LinkedList of Runnable tasks
        HashMap postRollbackTasks = new HashMap();
  +     // Maps Global transactions to local transactions
  +     HashMap globalToLocal = new HashMap();
   
        class GlobalXID implements Runnable {
                SpyDistributedConnection dc;
  @@ -55,292 +42,207 @@
                        this.dc = dc;
                        this.xid = xid;
                }
  -             
  +
                public boolean equals(Object obj)
                {
  -                     if (obj==null) return false;                    
  +                     if (obj==null) return false;
                        if (obj.getClass()!=GlobalXID.class) return false;
                        return ((GlobalXID)obj).xid.equals( xid ) &&
                                   ((GlobalXID)obj).dc.equals(dc);
                }
  -             
  +
                public int hashCode() {
                        return xid.hashCode();
                }
   
                public void run() {
                        synchronized (globalToLocal) {
  -                             globalToLocal.remove(this);                     
  +                             globalToLocal.remove(this);
                        }
                }
        }
   
  -     static class LogInfo {
  -             SpyMessageLog log;
  -             SpyDestination destination;
  -             String queueId;
  -
  -             LogInfo(SpyMessageLog log, SpyDestination destination, String queueId) 
{
  -                     this.log=log;
  -                     this.destination=destination;
  -                     this.queueId=queueId;
  +     /**
  +      * Create and return a unique transaction id.
  +      */
  +     public final Long createTx() throws javax.jms.JMSException {
  +             Long txId = createPersistentTx();
  +             synchronized (postCommitTasks) {
  +                     postCommitTasks.put(txId, new LinkedList());
  +                     postRollbackTasks.put(txId, new LinkedList());
                }
  -             
  +             return txId;
        }
  -             
  +
        /**
  -      * PersistenceManager constructor.
  +      * Create and return a unique transaction id.
  +   *
  +   * Given a distributed connection and a transaction id object,
  +   * allocate a unique local transaction id if the remote id is not already
  +   * known.
         */
  -     public PersistenceManager(JMSServer server, XElement configElement) throws 
javax.jms.JMSException {
  +     public final Long createTx(SpyDistributedConnection dc, Object xid) throws 
javax.jms.JMSException {
   
  -             try {
  +             GlobalXID gxid = new GlobalXID(dc, xid);
  +    synchronized(globalToLocal){
  +      if( globalToLocal.containsKey(gxid) )
  +        throw new JMSException("Duplicate transaction from: "+dc.getClientID()+" 
xid="+xid);
  +    }
   
  -                     this.server = server;
  -                     this.configElement = configElement;
  +             Long txId = createTx();
  +    synchronized(globalToLocal){
  +               globalToLocal.put(gxid, txId);
  +    }
   
  -                     URL configFile = 
getClass().getClassLoader().getResource("jbossmq.xml");
  -                     dataDirectory = new URL(configFile, 
configElement.getField("DataDirectory"));
  -                     URL txLogFile = new URL(dataDirectory, "transactions.dat");
  -                     spyTxLog = new SpyTxLog(txLogFile.getFile());
  -
  -             } catch (Exception e) {
  -                     javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }
  +             //Tasks to remove the global to local mappings on commit/rollback
  +             addPostCommitTask(txId, gxid);
  +             addPostRollbackTask(txId, gxid);
   
  -     }
  +             return txId;
  +  }
  +
  +  /**
  +   * Commit the transaction to the persistent store.
  +   */
  +     public final void commitTx(Long txId) throws javax.jms.JMSException {
  +
  +             LinkedList tasks;
  +             synchronized( postCommitTasks ) {
  +                     tasks = (LinkedList)postCommitTasks.remove(txId);
  +                     postRollbackTasks.remove(txId);
  +             }
  +             if( tasks == null )
  +                     throw new javax.jms.JMSException("Transaction is not active 
for commit.");
   
  -     
  +    commitPersistentTx(txId);
   
  +    synchronized(tasks){
  +      Iterator iter = tasks.iterator();
  +      while( iter.hasNext() ) {
  +        Runnable task = (Runnable)iter.next();
  +        task.run();
  +      }
  +    }
  +  }
   
  -     
  -     public void addPostCommitTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
  +     public final void addPostCommitTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
   
                if( txId == null ) {
                        task.run();
                        return;
                }
  -             
  +
                LinkedList tasks;
                synchronized (postCommitTasks) {
                        tasks = (LinkedList) postCommitTasks.get(txId);
                }
                if (tasks == null)
                        throw new javax.jms.JMSException("Transaction is not active.");
  -
                synchronized (tasks) {
                        tasks.addLast(task);
                }
   
        }
  -
  -     
  -     public void commitTx(Long txId) throws javax.jms.JMSException {
  -
  -             LinkedList tasks;
  -             synchronized( postCommitTasks ) {
  -                     tasks = (LinkedList)postCommitTasks.remove(txId);
  -                     postRollbackTasks.remove(txId);
  -             }
  -             if( tasks == null )
  -                     throw new javax.jms.JMSException("Transaction is not active.");
   
  -             spyTxLog.commitTx(txId);
  -             
  -             synchronized (tasks) {
  -                     Iterator iter = tasks.iterator();
  -                     while( iter.hasNext() ) {
  -                             Runnable task = (Runnable)iter.next();
  -                             task.run();
  -                     }
  -             }
  -                     
  -     }
  -
  -     
  -     public Long createTx() throws javax.jms.JMSException {
  -             Long txId = spyTxLog.createTx();
  -             synchronized (postCommitTasks) {
  -                     postCommitTasks.put(txId, new LinkedList());            
  -                     postRollbackTasks.put(txId, new LinkedList());          
  -             }
  -             return txId;
  -     }
  -
  -     
  -     
  -
  -     
  -     
  -     public void restore() throws javax.jms.JMSException {
  -
  -             TreeSet commitedTXs = spyTxLog.restore();
  -             HashMap clone;
  -             synchronized (messageLogs) {
  -                     clone = (HashMap) messageLogs.clone();
  -             }
  -
  -             Iterator iter = clone.values().iterator();
  -             while (iter.hasNext()) {
  -             
  -                     LogInfo logInfo = (LogInfo)iter.next();                        
 
  -
  -                     JMSDestination q = 
server.getJMSDestination(logInfo.destination);
  -
  -                     SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
  -
  -                     //TODO: make sure this lock is good enough
  -                     synchronized (q) {
  -                             for (int i = 0; i < rebuild.length; i++) {
  -                                     q.restoreMessage(rebuild[i], logInfo.queueId);
  -                                     q.messageIdCounter = 
Math.max(q.messageIdCounter, rebuild[i].messageId + 1);
  -                             }
  -                     }
  -             }
  -
  -     }
  -     
  -     public void rollbackTx(Long txId) throws javax.jms.JMSException {
  +  /**
  +   * Rollback the transaction.
  +   */
  +     public final void rollbackTx(Long txId) throws javax.jms.JMSException {
   
                LinkedList tasks;
                synchronized( postCommitTasks ) {
                        tasks = (LinkedList)postRollbackTasks.remove(txId);
                        postCommitTasks.remove(txId);
  -             }
  +    }
                if( tasks == null )
  -                     throw new javax.jms.JMSException("Transaction is not active.");
  +                     throw new javax.jms.JMSException("Transaction is not active 
3.");
   
  -             spyTxLog.rollbackTx(txId);
  +    rollbackPersistentTx(txId);
   
  -             synchronized (tasks) {
  -                     Iterator iter = tasks.iterator();
  -                     while( iter.hasNext() ) {
  -                             Runnable task = (Runnable)iter.next();
  -                             task.run();
  -                     }
  -             }
  -                     
  -     }
  -     
  -     public void addPostRollbackTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
  +    synchronized(tasks){
  +      Iterator iter = tasks.iterator();
  +      while( iter.hasNext() ) {
  +        Runnable task = (Runnable)iter.next();
  +        task.run();
  +      }
  +    }
  +
  +  }
   
  +     public final void addPostRollbackTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
  +
                if( txId == null ) {
                        return;
                }
  -             
  +
                LinkedList tasks;
  -             synchronized( postRollbackTasks ) {
  +             synchronized( postCommitTasks ) {
                        tasks = (LinkedList)postRollbackTasks.get(txId);
                }
                if( tasks == null )
  -                     throw new javax.jms.JMSException("Transaction is not active.");
  -
  +                     throw new javax.jms.JMSException("Transaction is not active 
4.");
                synchronized (tasks) {
                        tasks.addLast(task);
                }
  -                     
  -     }
  -
  -     public Long createTx(SpyDistributedConnection dc, Object xid) throws 
javax.jms.JMSException {
   
  -             GlobalXID gxid = new GlobalXID(dc, xid);
  -             if( globalToLocal.containsValue(gxid) ) 
  -                     throw new JMSException("Duplicate transaction from: 
"+dc.getClientID()+" xid="+xid);
  -                     
  -             Long txId = createTx();
  -             globalToLocal.put(gxid, txId);
  -
  -             //Tasks to remove the global to local mappings on commit/rollback
  -             addPostCommitTask(txId, gxid);
  -             addPostRollbackTask(txId, gxid);
  -             
  -             return txId;
        }
   
  -     public Long getPrepared(SpyDistributedConnection dc, Object xid) throws 
javax.jms.JMSException {
  -             
  +  /**
  +   * Return the local transaction id for a distributed transaction id.
  +   */
  +     public final Long getPrepared(SpyDistributedConnection dc, Object xid) throws 
javax.jms.JMSException {
  +
                GlobalXID gxid = new GlobalXID(dc, xid);
  -             Long txid = (Long)globalToLocal.get(gxid);
  -             
  -             if( txid == null )              
  +    Long txid;
  +    synchronized(globalToLocal){
  +               txid = (Long)globalToLocal.get(gxid);
  +    }
  +             if( txid == null )
                        throw new JMSException("Transaction does not exist from: 
"+dc.getClientID()+" xid="+xid);
  -                     
  -             return txid;
  -     }
  -
  -     public void initQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  -
  -             try {
  -
  -                     URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
  -                     SpyMessageLog log = new SpyMessageLog(logFile.getFile());
   
  -                     LogInfo info = new LogInfo(log, dest, queueId);
  -                     
  -                     messageLogs.put(""+dest+"-"+queueId, info);
  -
  -             } catch (javax.jms.JMSException e) {
  -                     throw e;
  -             } catch (Exception e) {
  -                     javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }
  -
  -     }
  -
  -     public void destroyQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  -
  -             try {
  -
  -                     URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
  -                     java.io.File file = new java.io.File(logFile.getFile());
  -     
  -                     SpyMessageLog log = 
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
  -                     if( log == null )
  -                             throw new JMSException("The persistence log was never 
initialized");
  -                     log.close();
  -
  -                     file.delete();
  -
  -             } catch (javax.jms.JMSException e) {
  -                     throw e;
  -             } catch (Exception e) {
  -                     javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }
  -
  -     }
  -
  -     public void add(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  -
  -             LogInfo logInfo;
  -
  -             synchronized (messageLogs) {
  -                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
  -             }
  -
  -             if (logInfo == null)
  -                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  -
  -             logInfo.log.add(message, txId);
  -
  -     }
  -
  -     public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  -
  -             LogInfo logInfo;
  +             return txid;
  +  }
   
  -             synchronized (messageLogs) {
  -                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
  -             }
  +     /**
  +      * Restore messages from the persistent queues.
  +      */
  +     public abstract void restore() throws javax.jms.JMSException;
   
  -             if (logInfo == null)
  -                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  +     /**
  +      * Create and return a unique transaction id.
  +      */
  +     public abstract Long createPersistentTx() throws javax.jms.JMSException;
   
  -             logInfo.log.remove(message, txId);
  +  /**
  +   * Commit the transaction to the persistent store.
  +   */
  +     public abstract void commitPersistentTx(Long txId) throws 
javax.jms.JMSException;
  +
  +  /**
  +   * Rollback the transaction.
  +   */
  +     public abstract void rollbackPersistentTx(Long txId) throws 
javax.jms.JMSException;
  +
  +  /**
  +   * Initialize the queue.
  +   */
  +     public abstract void initQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException;
  +
  +  /**
  +   * Remove the queue, and all messages in it, from the persistent store
  +   */
  +     public abstract void destroyQueue( SpyDestination dest, String queueId ) 
throws javax.jms.JMSException;
  +
  +  /**
  +   * Remove message from the persistent store.
  +   * If the message is part of a transaction, txId is not null.
  +   */
  +     public abstract void add(String queueId, SpyMessage message, Long txId) throws 
javax.jms.JMSException;
  +
  +  /**
  +   * Remove message from the persistent store.
  +   * If the message is part of a transaction, txId is not null.
  +   */
  +     public abstract void remove(String queueId, SpyMessage message, Long txId) 
throws javax.jms.JMSException;
   
  -     }
   }
  
  
  
  1.3       +58 -53    jbossmq/src/main/org/jbossmq/server/JMSDestination.java
  
  Index: JMSDestination.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSDestination.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JMSDestination.java       2001/03/02 01:13:02     1.2
  +++ JMSDestination.java       2001/05/15 07:16:48     1.3
  @@ -23,13 +23,13 @@
   import org.jbossmq.persistence.SpyMessageLog;
   
   /**
  - *   This class is a message queue which is stored (hashed by Destination) on the 
  + *   This class is a message queue which is stored (hashed by Destination) on the
    *   JMS provider
  - *      
  + *
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.2 $
  + *
  + *   @version $Revision: 1.3 $
    */
   public class JMSDestination {
   
  @@ -41,25 +41,25 @@
        ClientConsumer temporaryDestination;
        //The JMSServer object
        JMSServer server;
  -     //Am I a queue or a topic  
  +     //Am I a queue or a topic
        boolean isTopic;
        //Counter used to number incomming messages. (Used to order the messages.)
        long messageIdCounter = Long.MIN_VALUE;
  -     //Hashmap of ExclusiveQueues 
  +     //Hashmap of ExclusiveQueues
        HashMap exclusiveQueues = new HashMap();
        //ShareQueue used for topics
        SharedQueue sharedQueue;
   
  -     // Constructor ---------------------------------------------------         
  +     // Constructor ---------------------------------------------------
        JMSDestination(SpyDestination dest,ClientConsumer temporary,JMSServer server) 
throws JMSException
        {
                destination=dest;
                temporaryDestination=temporary;
                this.server=server;
                isTopic=dest instanceof SpyTopic;
  -             
  +
                sharedQueue = new SharedQueue(server);
  -             
  +
                if( !isTopic ) {
                        exclusiveQueues.put(DEFAULT_QUEUE_ID, new 
ExclusiveQueue(server));
   
  @@ -68,12 +68,12 @@
                                server.persistenceManager.initQueue(dest, 
DEFAULT_QUEUE_ID);
                        }
                }
  -             
  +
        }
   
        public void addMessage(SpyMessage mes, Long txId) throws JMSException
        {
  -     
  +
                Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
   
                if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT &&
  @@ -81,59 +81,61 @@
                        throw new JMSException("Cannot write a persistent message to a 
temporary destination!");
                }
   
  -             
  +
                //Number the message so that we can preserve order of delivery.
  -             mes.messageId = messageIdCounter++;
  +             synchronized(this) {
  +               mes.messageId = messageIdCounter++;
  +             }
   
                if( isTopic ) {
  -                     
  +
                        sharedQueue.addMessage(mes, txId);
  -                     
  +
                        synchronized (exclusiveQueues) {
  -                             
  +
                                if( exclusiveQueues.size() == 0 )
                                        return;
  -             
  +
                                Iterator iter = exclusiveQueues.keySet().iterator();
                                while( iter.hasNext() ) {
  -                                     
  +
                                        String queueId = (String)iter.next();
                                        ExclusiveQueue eq = 
(ExclusiveQueue)exclusiveQueues.get(queueId);
   
  -                                     if( mes.getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) 
  +                                     if( mes.getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT )
                                                server.persistenceManager.add(queueId, 
mes, txId);
  -                                             
  +
                                        eq.addMessage(mes, txId);
  -                                     
  +
                                }
                        }
  -                     
  +
                } else {
  -                     
  +
                        ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( 
DEFAULT_QUEUE_ID );
  -                     if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) 
  +                     if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
                                server.persistenceManager.add(DEFAULT_QUEUE_ID, mes, 
txId);
  -                             
  +
                        eq.addMessage(mes, txId);
  -                     
  +
                }
  -             
  +
        }
  -     
   
  -     // Package protected ---------------------------------------------          
  +
  +     // Package protected ---------------------------------------------
        void addExclusiveConsumer(String queue, ClientConsumer c) throws JMSException {
   
                Log.log(""+this+"->addExclusiveConsumer(queue="+queue+", 
consumer="+c+")");
  -             
  +
                ExclusiveQueue eq = getExclusiveQueue( queue );
                if( eq == null )
                        throw new JMSException("That destination queue does not 
exist");
  -             
  +
                eq.addConsumer(c);
        }
   
  -     // Package protected ---------------------------------------------          
  +     // Package protected ---------------------------------------------
        void addSharedConsumer(ClientConsumer c) throws JMSException {
                Log.log(""+this+"->addSharedConsumer(consumer="+c+")");
                sharedQueue.addConsumer(c);
  @@ -145,16 +147,16 @@
                return eq.browse( selector );
        }
   
  -     // Package protected ---------------------------------------------          
  +     // Package protected ---------------------------------------------
        ExclusiveQueue getExclusiveQueue(String queue) {
  -             
  +
                synchronized (exclusiveQueues) {
  -                     return (ExclusiveQueue)exclusiveQueues.get( queue );           
 
  +                     return (ExclusiveQueue)exclusiveQueues.get( queue );
                }
  -             
  +
        }
   
  -     // Package protected ---------------------------------------------          
  +     // Package protected ---------------------------------------------
        void removeConsumerFromAll(ClientConsumer c) throws JMSException {
                Log.log(""+this+"->removeConsumerFromAll(consumer="+c+")");
   
  @@ -167,33 +169,36 @@
                                eq.removeConsumer(c);
                        }
                }
  -             
  +
        }
   
  -     // Package protected ---------------------------------------------          
  +     // Package protected ---------------------------------------------
        void removeExclusiveConsumer(String queue, ClientConsumer c) throws 
JMSException {
   
                Log.log(""+this+"->removeExclusiveConsumer(queue="+queue+", 
consumer="+c+")");
  -             
  +
                ExclusiveQueue eq = getExclusiveQueue( queue );
                if( eq == null )
                        throw new JMSException("That destination queue does not 
exist");
  -             
  +
                eq.removeConsumer(c);
        }
   
  -     // Package protected ---------------------------------------------          
  +     // Package protected ---------------------------------------------
        void removeSharedConsumer(ClientConsumer c) throws JMSException {
                Log.log(""+this+"->removeSharedConsumer(consumer="+c+")");
                sharedQueue.removeConsumer(c);
        }
   
        //Used to put a message that was added previously to the queue, back in the 
queue
  -     public void restoreMessage(SpyMessage mes, String queueId) 
  +     public void restoreMessage(SpyMessage mes, String queueId)
        {
                Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
  -             ExclusiveQueue eq = getExclusiveQueue(queueId);         
  -             eq.restoreMessage(mes);         
  +             synchronized(this) {
  +               messageIdCounter = Math.max(messageIdCounter, mes.messageId+1);
  +             }
  +             ExclusiveQueue eq = getExclusiveQueue(queueId);
  +             eq.restoreMessage(mes);
        }
   
        public String toString() {
  @@ -202,33 +207,33 @@
   
        public void createDurableSubscription(String clientId, String 
subscriptionName) throws JMSException
        {
  -             if( !isTopic ) 
  +             if( !isTopic )
                        throw new JMSException("Not a valid operation on a Queue");
   
                String queueId = 
durableSubscriptionToQueueId(clientId,subscriptionName);
  -             
  +
                synchronized (exclusiveQueues) {
                        exclusiveQueues.put(queueId, new ExclusiveQueue(server));
                }
  -             
  +
                server.persistenceManager.initQueue(destination, queueId);
  -             
  +
        }
   
        public void destoryDurableSubscription(String clientId, String 
subscriptionName) throws JMSException
        {
  -             if( !isTopic ) 
  +             if( !isTopic )
                        throw new JMSException("Not a valid operation on a Queue");
   
                String queueId = 
durableSubscriptionToQueueId(clientId,subscriptionName);
  -             synchronized (exclusiveQueues) {                
  +             synchronized (exclusiveQueues) {
                        exclusiveQueues.remove(queueId);
                }
                server.persistenceManager.destroyQueue(destination, queueId);
  -             
  +
        }
   
  -     static public String durableSubscriptionToQueueId(String clientId, String 
subscriptionName) 
  +     static public String durableSubscriptionToQueueId(String clientId, String 
subscriptionName)
        {
                return clientId+"-"+subscriptionName;
        }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to