User: hiram   
  Date: 00/11/22 09:43:33

  Modified:    src/java/org/spydermq JMSServerQueueReceiver.java
                        JMSServerQueue.java JMSServer.java
  Log:
  Added a crude persistence scheme to support persistent messages.
  It's Transactional behavior still needs some work.
  
  Revision  Changes    Path
  1.2       +12 -5     spyderMQ/src/java/org/spydermq/JMSServerQueueReceiver.java
  
  Index: JMSServerQueueReceiver.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueueReceiver.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JMSServerQueueReceiver.java       2000/11/19 19:59:57     1.1
  +++ JMSServerQueueReceiver.java       2000/11/22 17:43:33     1.2
  @@ -11,6 +11,7 @@
   import java.io.Serializable;
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  +import javax.jms.DeliveryMode;
   
   /**
    * This class manages a connection receiver for a JMSServerQueue.
  @@ -18,7 +19,7 @@
    *      
    *@author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *@version $Revision: 1.1 $
  + *@version $Revision: 1.2 $
    */
   public class JMSServerQueueReceiver implements Serializable {
   
  @@ -47,7 +48,7 @@
        }
   
        
  -     void acknowledge(String messageId, boolean ack) {
  +     void acknowledge(String messageId, boolean ack) throws javax.jms.JMSException {
                SpyMessage m;
                synchronized (unacknowledgedMessages) {
                        m = (SpyMessage) unacknowledgedMessages.remove(messageId);
  @@ -55,18 +56,24 @@
   
                if (m == null)
                        return;
  -
  -             if (!jmsSeverQueue.isTopic) {
  +                     
  +             if (jmsSeverQueue.isTopic) {
                        // Not sure how we should handle the topic case.
                        // On a negative acknowledge, we don't want to
                        // add it back to the topic since other
                        // receivers might get a duplicate duplicate message.
  -             } else {
  +             } else {                        
                        // Was it a negative acknowledge??
                        if (!ack) {
                                Log.log("Restoring message: " + m.messageId);
                                jmsSeverQueue.restoreMessage(m);
                        } else {
  +                             
  +                             if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT 
) {
  +                                     
jmsSeverQueue.spyMessageLog.logRemoveMessage(m);
  +                                     jmsSeverQueue.spyMessageLog.commit();
  +                             }
  +                             
                                Log.log("Message Ack: " + m.messageId);
                        }
                }
  
  
  
  1.18      +43 -10    spyderMQ/src/java/org/spydermq/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- JMSServerQueue.java       2000/11/19 19:59:56     1.17
  +++ JMSServerQueue.java       2000/11/22 17:43:33     1.18
  @@ -13,13 +13,15 @@
   import java.util.LinkedList;
   import java.util.HashMap;
   import java.util.TreeSet;
  +import javax.jms.DeliveryMode;
  +import org.spydermq.persistence.SpyMessageLog;
   
   /**
    *This class is a message queue which is stored (hashed by Destination) on the JMS 
provider
    *      
    *@author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *@version $Revision: 1.17 $
  + *@version $Revision: 1.18 $
    */
   public class JMSServerQueue {
        // Attributes ----------------------------------------------------
  @@ -52,9 +54,12 @@
        // Should we use the round robin aproach to pick the next reciver of a p2p 
message?
        private boolean useRoundRobinMessageDistribution = true;
   
  +     // Used to log the persistent messages.
  +     SpyMessageLog spyMessageLog;
        
  +     
        // Constructor ---------------------------------------------------         
  -     JMSServerQueue(SpyDestination dest,SpyDistributedConnection 
temporary,JMSServer server)
  +     JMSServerQueue(SpyDestination dest,SpyDistributedConnection 
temporary,JMSServer server) throws JMSException
        {
                destination=dest;
                subscribers=new HashMap();
  @@ -65,6 +70,15 @@
                this.server=server;
                isTopic=dest instanceof SpyTopic;
                listeners=0;
  +             
  +             spyMessageLog = new SpyMessageLog( dest.name+"-transaction-log.dat" );
  +             SpyMessage[] rebuild = spyMessageLog.rebuildMessagesFromLog();
  +             for( int i=0; i < rebuild.length; i++ ) {
  +                     restoreMessage( rebuild[i] );
  +                     messageIdCounter = Math.max( messageIdCounter, 
rebuild[i].messageId+1 );
  +             }
  +             Log.notice("Restored "+rebuild.length+" messages to "+dest.name+" from 
the transaction log");
  +             
        }
   
   
  @@ -106,7 +120,7 @@
   
   
        
  -     public void addMessage(SpyMessage mes)
  +     public void addMessage(SpyMessage mes) throws JMSException
        {
                //Add a message to the message list... 
                synchronized (messages) 
  @@ -115,7 +129,13 @@
                        //messages is now an ordered tree. The order depends 
                        //first on the priority and then on messageId
                        mes.messageId = messageIdCounter++;
  -                     messages.add(mes);                              
  +
  +                     if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
  +                             spyMessageLog.logAddMessage(mes);
  +                             spyMessageLog.commit();
  +                     }
  +                     
  +                     messages.add(mes);
                        
                        if (isTopic) {
                                //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
  @@ -268,10 +288,20 @@
        void doMyJob() throws JMSException 
        {                       
                if (isTopic) {                  
  -                     
  +             
                        //Clear the message queue
                        SpyMessage[] msgs=startWork();
  -                             
  +
  +                     boolean msgRemoved=false;
  +                     for( int i=0; i < msgs.length; i++ ) {
  +                             if( msgs[i].getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) {
  +                                     spyMessageLog.logRemoveMessage(msgs[i]);
  +                                     msgRemoved=true;
  +                             }
  +                     }
  +                     if( msgRemoved )
  +                             spyMessageLog.commit();
  +     
                        //Let the thread do its work
                        if (msgs.length == 1) {
                                
  @@ -308,8 +338,13 @@
                                                break;
                                        }
                                        
  -                                     if (mes.isOutdated()) 
  +                                     if (mes.isOutdated()) {
  +                                             if( mes.getJMSDeliveryMode() == 
DeliveryMode.PERSISTENT ) {
  +                                                     
spyMessageLog.logRemoveMessage(mes);
  +                                                     spyMessageLog.commit();
  +                                             }
                                                continue;
  +                                     }
                                        
                                        // we may have to restore the 
lastUsedQueueReceiver
                                        // if message on the queue is not sent. (we 
don't want to skip 
  @@ -522,8 +557,6 @@
                        }
   
                }
  -     }
  -
  -
  +     }       
        
   }
  
  
  
  1.10      +3 -3      spyderMQ/src/java/org/spydermq/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServer.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- JMSServer.java    2000/11/19 19:59:56     1.9
  +++ JMSServer.java    2000/11/22 17:43:33     1.10
  @@ -22,7 +22,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class JMSServer 
                implements Runnable, JMSServerMBean
  @@ -268,7 +268,7 @@
                return newQueue;
        }
   
  -     public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection 
dc)
  +     public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection 
dc) throws JMSException
        {
                SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new 
Integer(lastTemporaryTopic++).toString()),dc);
   
  @@ -282,7 +282,7 @@
                return topic;
        }
        
  -     public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection 
dc)
  +     public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection 
dc) throws JMSException
        {
                SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new 
Integer(lastTemporaryQueue++).toString()),dc);
   
  
  
  

Reply via email to