User: hiram   
  Date: 01/01/10 05:57:45

  Modified:    src/java/org/spydermq/server SharedQueue.java
                        PersistenceManager.java JMSServer.java
                        JMSDestination.java ExclusiveQueue.java
                        ClientConsumer.java BasicQueue.java
                        AbstractQueue.java
  Added:       src/java/org/spydermq/server PersistentMessageEnvelope.java
                        NonPersistentMessageEnvelope.java
                        MessageEnvelope.java
  Log:
  Feature Add:  Faster recovery after a spyderMQ server failure
  Feature Add:  Better server scalability by moving message not in the working set to 
secondary storage.
  
  Revision  Changes    Path
  1.4       +6 -6      spyderMQ/src/java/org/spydermq/server/SharedQueue.java
  
  Index: SharedQueue.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SharedQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SharedQueue.java  2000/12/26 19:54:32     1.3
  +++ SharedQueue.java  2001/01/10 13:57:43     1.4
  @@ -23,7 +23,7 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SharedQueue extends BasicQueue {
   
  @@ -38,7 +38,7 @@
        public void run() throws JMSException 
        {       
                Log.log(""+this+"->run()");
  -             SpyMessage[] job;
  +             MessageEnvelope[] envelopes;
                                 
                synchronized (messages) {
                        if( messages.size() == 0 ) {
  @@ -46,8 +46,8 @@
                                return;
                        }
                                
  -                     job=new SpyMessage[messages.size()];
  -                     job=(SpyMessage[])messages.toArray(job);
  +                     envelopes=new MessageEnvelope[messages.size()];
  +                     envelopes=(MessageEnvelope[])messages.toArray(envelopes);
                        messages.clear();                       
                }
   
  @@ -61,8 +61,8 @@
                        
                        ClientConsumer consumer = (ClientConsumer)iter.next();
                
  -                     for( int i=0 ; i < job.length; i++ ) 
  -                             consumer.addMessage(job[i]);
  +                     for( int i=0 ; i < envelopes.length; i++ ) 
  +                             consumer.addMessage(envelopes[i].getMessage());
                                                
                        consumer.notifyMessageAvailable();
                        
  
  
  
  1.6       +62 -27    spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- PersistenceManager.java   2000/12/27 17:02:22     1.5
  +++ PersistenceManager.java   2001/01/10 13:57:43     1.6
  @@ -16,17 +16,19 @@
   
   import org.spydermq.xml.XElement;
   import org.spydermq.persistence.SpyTxLog;
  -import org.spydermq.persistence.SpyMessageLog;
  +import org.spydermq.persistence.SpyMessageQueue;
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyMessage;
   import org.spydermq.SpyDistributedConnection;
   
  +import java.io.File;
  +
   /**
    *   This class manages all persistence related services.
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class PersistenceManager {
   
  @@ -35,10 +37,10 @@
        // The configuration data for the manager.
        XElement configElement;
        // The directory where persistence data should be stored
  -     URL dataDirectory;
  +     File dataDirectory;
        // Log file used to store commited transactions.
        SpyTxLog spyTxLog;
  -     // Maps SpyDestinations to SpyMessageLogs
  +     // Maps SpyDestinations to SpyMessageQueues
        HashMap messageLogs = new HashMap();
        // Maps (Long)txIds to LinkedList of Runnable tasks
        HashMap postCommitTasks = new HashMap();
  @@ -76,11 +78,11 @@
        }
   
        static class LogInfo {
  -             SpyMessageLog log;
  +             SpyMessageQueue log;
                SpyDestination destination;
                String queueId;
   
  -             LogInfo(SpyMessageLog log, SpyDestination destination, String queueId) 
{
  +             LogInfo(SpyMessageQueue log, SpyDestination destination, String 
queueId) {
                        this.log=log;
                        this.destination=destination;
                        this.queueId=queueId;
  @@ -99,9 +101,10 @@
                        this.configElement = configElement;
   
                        URL configFile = 
getClass().getClassLoader().getResource("spyderMQ.xml");
  -                     dataDirectory = new URL(configFile, 
configElement.getField("DataDirectory"));
  -                     URL txLogFile = new URL(dataDirectory, "transactions.dat");
  -                     spyTxLog = new SpyTxLog(txLogFile.getFile());
  +                     dataDirectory = new File(new URL(configFile, 
configElement.getField("DataDirectory")).getFile());
  +                     dataDirectory.mkdirs();
  +                     File txLogFile = new File(dataDirectory, "transactions.dat");
  +                     spyTxLog = new SpyTxLog(txLogFile.getAbsolutePath());
   
                } catch (Exception e) {
                        javax.jms.JMSException newE = new 
javax.jms.JMSException("Invalid configuration.");
  @@ -112,21 +115,8 @@
        }
   
        
  -     public void add(String queueId, org.spydermq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
   
  -             LogInfo logInfo;
   
  -             synchronized (messageLogs) {
  -                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
  -             }
  -
  -             if (logInfo == null)
  -                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  -
  -             logInfo.log.add(message, txId);
  -
  -     }
  -
        
        public void addPostCommitTask(Long txId, Runnable task) throws 
javax.jms.JMSException {
   
  @@ -299,8 +289,9 @@
   
                try {
   
  -                     URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
  -                     SpyMessageLog log = new SpyMessageLog(logFile.getFile());
  +                     File logFile = new File(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
  +                     File messageDirectory = new File(dataDirectory, 
dest.toString()+"-"+queueId+"-messages");
  +                     SpyMessageQueue log = new SpyMessageQueue(this, 
logFile.getAbsolutePath(), messageDirectory.getAbsolutePath());
   
                        LogInfo info = new LogInfo(log, dest, queueId);
                        
  @@ -320,10 +311,9 @@
   
                try {
   
  -                     URL logFile = new URL(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
  -                     java.io.File file = new java.io.File(logFile.getFile());
  +                     File file = new File(dataDirectory, 
dest.toString()+"-"+queueId+".dat");
        
  -                     SpyMessageLog log = 
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
  +                     SpyMessageQueue log = 
(SpyMessageQueue)messageLogs.remove(""+dest+"-"+queueId);
                        if( log == null )
                                throw new JMSException("The persistence log was never 
initialized");
                        log.close();
  @@ -337,6 +327,51 @@
                        newE.setLinkedException(e);
                        throw newE;
                }
  +
  +     }
  +
  +     public java.io.File add(String queueId, org.spydermq.SpyMessage message, Long 
txId) throws javax.jms.JMSException {
  +
  +             LogInfo logInfo;
  +
  +             synchronized (messageLogs) {
  +                     logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
  +             }
  +
  +             if (logInfo == null)
  +                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  +
  +             return logInfo.log.add(message, txId);
  +
  +     }
  +
  +     public File getPersistenceFileFor(SpyMessage mes, String queueId) throws 
javax.jms.JMSException {
  +
  +             LogInfo logInfo;
  +
  +             synchronized (messageLogs) {
  +                     logInfo = (LogInfo) 
messageLogs.get(""+mes.getJMSDestination()+"-"+queueId);
  +             }
  +
  +             if (logInfo == null)
  +                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  +
  +             return logInfo.log.messageIdToFile(mes.messageId);
  +
  +     }
  +
  +     public File getSpyMessageQueue(SpyMessage mes, String queueId) throws 
javax.jms.JMSException {
  +
  +             LogInfo logInfo;
  +
  +             synchronized (messageLogs) {
  +                     logInfo = (LogInfo) 
messageLogs.get(""+mes.getJMSDestination()+"-"+queueId);
  +             }
  +
  +             if (logInfo == null)
  +                     throw new javax.jms.JMSException("Destination was not 
initalized with the PersistenceManager");
  +
  +             return logInfo.log.messageIdToFile(mes.messageId);
   
        }
   }
  
  
  
  1.12      +3 -1      spyderMQ/src/java/org/spydermq/server/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- JMSServer.java    2001/01/04 23:24:51     1.11
  +++ JMSServer.java    2001/01/10 13:57:43     1.12
  @@ -27,7 +27,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
  @@ -151,6 +151,8 @@
                                task.run();
                        } catch (JMSException e) {
                                Log.error(e);
  +                             if( e.getLinkedException() != null )
  +                                     Log.error( e.getLinkedException() );
                        }
                }
   
  
  
  
  1.5       +16 -11    spyderMQ/src/java/org/spydermq/server/JMSDestination.java
  
  Index: JMSDestination.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSDestination.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- JMSDestination.java       2001/01/03 23:25:06     1.4
  +++ JMSDestination.java       2001/01/10 13:57:43     1.5
  @@ -26,7 +26,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class JMSDestination {
   
  @@ -84,7 +84,7 @@
   
                if( isTopic ) {
                        
  -                     sharedQueue.addMessage(mes, txId);
  +                     sharedQueue.addMessage(new MessageEnvelope(mes), txId);
                        
                        synchronized (exclusiveQueues) {
                                
  @@ -97,10 +97,12 @@
                                        String queueId = (String)iter.next();
                                        ExclusiveQueue eq = 
(ExclusiveQueue)exclusiveQueues.get(queueId);
   
  -                                     if( mes.getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) 
  -                                             server.persistenceManager.add(queueId, 
mes, txId);
  -                                             
  -                                     eq.addMessage(mes, txId);
  +                                     if( mes.getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) {
  +                                             java.io.File f = 
server.persistenceManager.add(queueId, mes, txId);
  +                                             eq.addMessage(new 
PersistentMessageEnvelope(mes,f), txId);
  +                                     } else {
  +                                             eq.addMessage(new 
NonPersistentMessageEnvelope(mes, 
server.persistenceManager.getPersistenceFileFor(mes,DEFAULT_QUEUE_ID)), txId);
  +                                     }
                                        
                                }
                        }
  @@ -108,10 +110,13 @@
                } else {
                        
                        ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get( 
DEFAULT_QUEUE_ID );
  -                     if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) 
  -                             server.persistenceManager.add(DEFAULT_QUEUE_ID, mes, 
txId);
  -                             
  -                     eq.addMessage(mes, txId);
  +                     if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
  +                             java.io.File f = 
server.persistenceManager.add(DEFAULT_QUEUE_ID, mes, txId);
  +                             eq.addMessage(new PersistentMessageEnvelope(mes, f), 
txId);
  +                     } else {
  +                             eq.addMessage(new NonPersistentMessageEnvelope(mes, 
server.persistenceManager.getPersistenceFileFor(mes,DEFAULT_QUEUE_ID)), txId);
  +                     }
  +                     
                        
                }
                
  @@ -190,7 +195,7 @@
        {
                Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
                ExclusiveQueue eq = getExclusiveQueue(queueId);         
  -             eq.restoreMessage(mes);         
  +             eq.restoreMessage( new MessageEnvelope(mes) );
        }
   
        public String toString() {
  
  
  
  1.5       +29 -7     spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java
  
  Index: ExclusiveQueue.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ExclusiveQueue.java       2001/01/03 23:25:06     1.4
  +++ ExclusiveQueue.java       2001/01/10 13:57:43     1.5
  @@ -24,11 +24,19 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class ExclusiveQueue extends BasicQueue {
   
  +     private static final long MSG_IDLE_TIME_ALLOWENCE = 1000*10; // 10 seconds
  +     private long lastActiveConsumerTS=0;
   
  +     // Constructor ---------------------------------------------------         
  +     public ExclusiveQueue(JMSServer server) throws JMSException
  +     {
  +             super(server);
  +     }
  +
        // Iterate over the consumers asking them to take messages until they stop
        // consuming.
        public void run() throws JMSException 
  @@ -38,7 +46,24 @@
                 
                synchronized (messages) {                                              
 
                        synchronized (consumers) {
  -                                                             
  +                             
  +                             if( consumers.size() ==0 ) {
  +                                     if( (System.currentTimeMillis() - 
lastActiveConsumerTS) 
  +                                                     > MSG_IDLE_TIME_ALLOWENCE ) {
  +                                             
  +                                             // No consumers.. move messages to 
secondary storage 
  +                                             // There have been no consumers for a 
while.
  +                                             Iterator i = messages.iterator();
  +                                             while( i.hasNext() ) {
  +                                                     MessageEnvelope me = 
(MessageEnvelope)i.next();
  +                                                     me.moveToSecondaryStorage();
  +                                             }
  +                                             
  +                                     }
  +
  +                                     return;
  +                             }
  +                                     
                                LinkedList consumersDone = new LinkedList();
   
                                while( consumers.size()!=0 && messages.size() != 0) {
  @@ -61,6 +86,8 @@
                                while( consumersDone.size() != 0 ) {
                                        consumers.addLast(consumersDone.removeFirst());
                                }
  +
  +                             lastActiveConsumerTS = System.currentTimeMillis();
                                        
                        }                                                              
                 
                }
  @@ -71,9 +98,4 @@
                return "ExclusiveQueue";
        }
   
  -     // Constructor ---------------------------------------------------         
  -     public ExclusiveQueue(JMSServer server) throws JMSException
  -     {
  -             super(server);
  -     }
   }
  
  
  
  1.7       +9 -8      spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
  
  Index: ClientConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ClientConsumer.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- ClientConsumer.java       2001/01/04 23:24:51     1.6
  +++ ClientConsumer.java       2001/01/10 13:57:43     1.7
  @@ -27,7 +27,7 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class ClientConsumer implements Task {
   
  @@ -389,22 +389,23 @@
                Iterator i = queue.messages.iterator();
                while( i.hasNext() ) {
                        
  -                     SpyMessage message = (SpyMessage)i.next();
  +                     MessageEnvelope envelope = (MessageEnvelope)i.next();
  +                     SpyMessage headers = envelope.getHeadersMessage();
                        
  -                     LinkedList l = (LinkedList)destinationSubscriptions.get( 
message.getJMSDestination() );
  +                     LinkedList l = (LinkedList)destinationSubscriptions.get( 
headers.getJMSDestination() );
                        if( l == null ) return false;
   
                        Iterator subs = l.iterator();                   
                        while(  subs.hasNext() ) {
                                
                                Subscription s = (Subscription)subs.next();
  -                             if( s.accepts( message, true ) ) {
  +                             if( s.accepts( headers, true ) ) {
   
                                        s.receiving = false;
                                        i.remove();
                                        
                                        ReceiveRequest r = new ReceiveRequest();
  -                                     r.message = message;
  +                                     r.message = envelope.getMessage();
                                        r.subscriptionId = new 
Integer(s.subscriptionId);
                                                                                
                                        synchronized (messages) {
  @@ -412,13 +413,13 @@
                                        }
                                        
                                        AcknowledgementRequest ack = new 
AcknowledgementRequest();
  -                                     ack.destination = message.getJMSDestination();
  -                                     ack.messageID = message.getJMSMessageID();
  +                                     ack.destination = headers.getJMSDestination();
  +                                     ack.messageID = headers.getJMSMessageID();
                                        ack.subscriberId = s.subscriptionId;
                                        ack.isAck = false;
   
                                        synchronized (unacknowledgedMessages) {
  -                                             unacknowledgedMessages.put(ack, 
message);
  +                                             unacknowledgedMessages.put(ack, 
r.message);
                                        }
                                        
                                        notifyMessageAvailable();
  
  
  
  1.2       +63 -61    spyderMQ/src/java/org/spydermq/server/BasicQueue.java
  
  Index: BasicQueue.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/BasicQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- BasicQueue.java   2000/12/24 01:55:06     1.1
  +++ BasicQueue.java   2001/01/10 13:57:44     1.2
  @@ -28,7 +28,7 @@
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   abstract public class BasicQueue implements Task, AbstractQueue {
   
  @@ -47,49 +47,7 @@
                this.server=server;
                                        
        }
  -             
  -     //Used to put a message that was added previously to the queue, back in the 
queue
  -     public void restoreMessage(SpyMessage mes) 
  -     {
  -             //restore a message to the message list...
  -             synchronized (messages) {
  -                     messages.add(mes);      
  -             }
  -             notifyMessageAvailable();
  -     }       
  -     
  -     public void addMessage(SpyMessage mes, Long txId) throws JMSException
  -     {
  -             Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
   
  -             // This task gets run to make the message visible in the queue.
  -             class AddMessagePostCommitTask implements Runnable {
  -                     SpyMessage message;
  -                     
  -                     AddMessagePostCommitTask(SpyMessage m) {
  -                             message = m;
  -                     }
  -                     
  -                     public void run() {
  -                             //restore a message to the message list...
  -                             synchronized (messages) {
  -                                     messages.add(message);  
  -                             }
  -                             notifyMessageAvailable();
  -                     }
  -             }
  -             
  -             // The message gets added to the queue after the transaction
  -             // commits (if the message was transacted)      
  -             Runnable task = new AddMessagePostCommitTask(mes);
  -             if( txId == null ) {
  -                     task.run();
  -             } else {
  -                     server.persistenceManager.addPostCommitTask(txId, task);
  -             }
  -             
  -     }
  -
        // 
        public void addConsumer(ClientConsumer consumer) throws JMSException
        {
  @@ -104,29 +62,36 @@
        public SpyMessage[] browse(String selector) throws JMSException {
                        
                if( selector == null ) {
  -                     SpyMessage list[];
  +                     MessageEnvelope list[];
                        synchronized (messages) {
  -                             list = new SpyMessage[messages.size()];
  -                             list = (SpyMessage [])messages.toArray(list);
  +                             list = new MessageEnvelope[messages.size()];
  +                             list = (MessageEnvelope [])messages.toArray(list);
                        }
  -                     return list;
  +                     
  +                     SpyMessage messageList[] = new SpyMessage[list.length];
  +                     for( int i=0; i < list.length; i++ ) 
  +                             messageList[i] = list[i].getMessage();
  +                     return messageList;
  +     
                } else {
                        Selector s = new Selector( selector );
                        LinkedList selection=new LinkedList();
  -                     
  +
  +                     MessageEnvelope list[];
                        synchronized (messages) {
  -                             Iterator i = messages.iterator();
  -                             while( i.hasNext() ) {
  -                                     SpyMessage m = (SpyMessage)i.next();
  -                                     if( s.test(m) )
  -                                             selection.add(m);
  -                             }
  +                             list = new MessageEnvelope[messages.size()];
  +                             list = (MessageEnvelope [])messages.toArray(list);
                        }
  -                     
  -                     SpyMessage list[];
  -                     list = new SpyMessage[selection.size()];
  -                     list = (SpyMessage [])selection.toArray(list);
  -                     return list;                    
  +
  +                     for( int i=0; i < list.length; i++ ) {
  +                             SpyMessage m = list[i].getMessage();
  +                             if( s.test(m) )
  +                                     selection.add(m);
  +                     }
  +                                     
  +                     SpyMessage messageList[] = new SpyMessage[selection.size()];
  +                     messageList = (SpyMessage [])selection.toArray(messageList);
  +                     return messageList;                     
                }
        }
   
  @@ -148,10 +113,10 @@
                        if (messages.size()==0) 
                                return null;
                                
  -                     SpyMessage m = (SpyMessage)messages.first();
  +                     MessageEnvelope m = (MessageEnvelope)messages.first();
                        messages.remove(m);
                        
  -                     return m;
  +                     return m.getMessage();
                }
        }
   
  @@ -164,4 +129,41 @@
                }
        }
   
  +     public void addMessage(MessageEnvelope mes, Long txId) throws JMSException
  +     {
  +             Log.log(""+this+"->addMessage(mes="+mes.messageId+",txId="+txId+")");
  +
  +             // This task gets run to make the message visible in the queue.
  +             class AddMessagePostCommitTask implements Runnable {
  +                     MessageEnvelope message;
  +                     
  +                     AddMessagePostCommitTask(MessageEnvelope m) {
  +                             message = m;
  +                     }
  +                     
  +                     public void run() {
  +                             //restore a message to the message list...
  +                             synchronized (messages) {
  +                                     messages.add(message);  
  +                             }
  +                             notifyMessageAvailable();
  +                     }
  +             }
  +             
  +             // The message gets added to the queue after the transaction
  +             // commits 
  +             Runnable task = new AddMessagePostCommitTask( mes );
  +             server.persistenceManager.addPostCommitTask(txId, task);
  +             
  +     }
  +
  +     //Used to put a message that was added previously to the queue, back in the 
queue
  +     public void restoreMessage(MessageEnvelope mes) 
  +     {
  +             //restore a message to the message list...
  +             synchronized (messages) {
  +                     messages.add(mes);      
  +             }
  +             notifyMessageAvailable();
  +     }
   }
  
  
  
  1.2       +5 -3      spyderMQ/src/java/org/spydermq/server/AbstractQueue.java
  
  Index: AbstractQueue.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/AbstractQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- AbstractQueue.java        2000/12/23 15:48:25     1.1
  +++ AbstractQueue.java        2001/01/10 13:57:44     1.2
  @@ -15,11 +15,13 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public interface AbstractQueue {
  +
  +     public void removeConsumer(ClientConsumer consumer) throws JMSException;
        public void addConsumer(ClientConsumer consumer) throws JMSException;
  -     public void addMessage(SpyMessage mes, Long txId) throws JMSException;
  +
  +     public void addMessage(MessageEnvelope mes, Long txId) throws JMSException;
        void notifyMessageAvailable();
  -     public void removeConsumer(ClientConsumer consumer) throws JMSException;
   }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/server/PersistentMessageEnvelope.java
  
  Index: PersistentMessageEnvelope.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  
  import java.io.File;
  import java.io.IOException;
  import java.io.FileInputStream;
  import java.io.BufferedInputStream;
  import java.io.ObjectInputStream;
  
  import org.spydermq.SpyMessage;
  
  /**
   *    This class represents an evelope used to place NonPersistent SpyMessages in.
   *
   *  This class will use an existing file to read the message from when loading
   *    from secondary storeage.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class PersistentMessageEnvelope extends MessageEnvelope {
  
        File persistenceFile;
        byte envelopeState;
  
        private static final byte MESSAGE_LOADED_STATE = 0;
        private static final byte HEADERS_LOADED_STATE = 1;
        private static final byte NOT_LOADED_STATE = 2;
  /**
   * MessageEnvelope constructor comment.
   */
  public PersistentMessageEnvelope(SpyMessage m, File messageImage ) {
  
        super( m );
        persistenceFile = messageImage;
        envelopeState = MESSAGE_LOADED_STATE;
        
  }
        public SpyMessage getHeadersMessage() throws JMSException {
                if( message == null )
                        loadFromStorage();
                return message;
        }
        public SpyMessage getMessage() throws JMSException {
                if( message == null )
                        loadFromStorage();
                return message;
        }
        private void loadFromStorage() throws JMSException {
                try {
                        
                        ObjectInputStream is = new ObjectInputStream(
                                new BufferedInputStream(
                                        new FileInputStream( persistenceFile ) ) );
  
                        message = (SpyMessage)is.readObject();
                        message.messageId = messageId;
                        is.close();
                                
                } catch (Exception e ) {
                        throwJMSException("Could not read a persisted message", e);
                }
  
        }
        public void moveToSecondaryStorage() throws JMSException {
                message = null;
        }
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;             
        }
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/server/NonPersistentMessageEnvelope.java
  
  Index: NonPersistentMessageEnvelope.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  
  import java.io.File;
  import java.io.IOException;
  import java.io.FileInputStream;
  import java.io.FileOutputStream;
  import java.io.BufferedOutputStream;
  import java.io.BufferedInputStream;
  import java.io.ObjectOutputStream;
  import java.io.ObjectInputStream;
  
  import org.spydermq.SpyMessage;
  
  /**
   *    This class represents an evelope used to place NonPersistent SpyMessages in.
   *
   *  This class will create a temporary file to be used as secondary storage.
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class NonPersistentMessageEnvelope extends MessageEnvelope {
  
        File persistenceFile;
        byte envelopeState;
        boolean isPersisted = false;
  
        private static final byte MESSAGE_LOADED_STATE = 0;
        private static final byte HEADERS_LOADED_STATE = 1;
        private static final byte NOT_LOADED_STATE = 2;
  
        public NonPersistentMessageEnvelope(SpyMessage m, File persistenceFile) {
  
                super(m);
  
                envelopeState = MESSAGE_LOADED_STATE;
                this.persistenceFile = persistenceFile;
  
        }
  
        public void finalize() {
                if (isPersisted) {
                        persistenceFile.delete();
                }
        }
        
        public SpyMessage getHeadersMessage() throws JMSException {
                if (message == null)
                        loadFromStorage();
                return message;
        }
        
        public SpyMessage getMessage() throws JMSException {
                if (message == null)
                        loadFromStorage();
                return message;
        }
  
        private void loadFromStorage() throws JMSException {
                try {
  
                        ObjectInputStream is = new ObjectInputStream(new 
BufferedInputStream(new FileInputStream(persistenceFile)));
  
                        message = (SpyMessage) is.readObject();
                        message.messageId = messageId;
                        is.close();
  
                } catch (Exception e) {
                        throwJMSException("Could not read a persisted message", e);
                }
  
        }
        
        public void moveToSecondaryStorage() throws JMSException {
  
                if (!isPersisted) {
  
                        try {
  
                                ObjectOutputStream os = new ObjectOutputStream(new 
BufferedOutputStream(new FileOutputStream(persistenceFile)));
                                os.writeObject(message);
                                os.close();
  
                                isPersisted = true;
  
                        } catch (IOException e) {
                                throwJMSException("Could not persist the message", e);
                        }
  
                }
  
                message = null;
        }
        
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/server/MessageEnvelope.java
  
  Index: MessageEnvelope.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.server;
  
  import javax.jms.JMSException;
  
  import java.io.File;
  
  import org.spydermq.SpyMessage;
  
  /**
   *    This class represents an evelope used to place SpyMessages in.
   *
   *  It holds the minimum amount of information need by the server
   *  to keep the message ordered in it's transmision queue.
   *
   *  Subclasses such as PersistentMessageEnvelope and NonPersistentMessageEnvelope
   *  extend this class so that message are moved to secondary storage
   *  when the message is not part of the working set.
   *      
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class MessageEnvelope implements Comparable {
  
        //For ordering in the JMSServerQueue (set on the server side)
        long messageId;
        int jmsPriority;
  
        SpyMessage message;
  
        /**
         * MessageEnvelope constructor comment.
         */
        public MessageEnvelope(SpyMessage m) {
  
                message = m;
                messageId = m.messageId;
                jmsPriority = m.getJMSPriority();
  
        }
        
        /**
         * used to order this message with respect to other message in
         * the servers queue.
         */
        public int compareTo(java.lang.Object o) {
                MessageEnvelope me = (MessageEnvelope) o;
  
                if (jmsPriority > me.jmsPriority) {
                        return -1;
                }
                if (jmsPriority < me.jmsPriority) {
                        return 1;
                }
                return (int) (messageId - me.messageId);
        }
  
        /**
         * return a message with at least the original headers
         * (That can be used to evaluagte a Selector)
         */
        public SpyMessage getHeadersMessage() throws JMSException {
                return message;
        }
  
        /**
         * returns the full original message.
         */
        public SpyMessage getMessage() throws JMSException {
                return message;
        }
  
        /**
         * This is a signal that the message is not in the working
         * set and should be moved to secondary storage.
         */
        public void moveToSecondaryStorage() throws JMSException {
        }
  }
  
  
  

Reply via email to