User: chirino 
  Date: 01/10/27 18:27:01

  Modified:    src/main/org/jboss/mq/pm/rollinglogged
                        PersistenceManager.java SpyMessageLog.java
  Log:
  Commiting my initial implementation of a message cache for the JBossMQ messages.  
This should allow the server to scale so it can hold a larger number of message.
  
  Revision  Changes    Path
  1.10      +17 -9     
jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- PersistenceManager.java   2001/10/11 02:19:03     1.9
  +++ PersistenceManager.java   2001/10/28 01:27:01     1.10
  @@ -27,12 +27,13 @@
   import org.jboss.mq.server.JMSServer;
   import org.jboss.mq.xml.XElement;
   import org.jboss.system.ServiceMBeanSupport;
  +import org.jboss.mq.server.MessageReference;
   
   /**
    *  This class manages all persistence related services.
    *
    * @author     David Maplesden ([EMAIL PROTECTED])
  - * @version    $Revision: 1.9 $
  + * @version    $Revision: 1.10 $
    */
   public class PersistenceManager extends ServiceMBeanSupport implements 
org.jboss.mq.pm.PersistenceManager, PersistenceManagerMBean
   {
  @@ -252,6 +253,9 @@
   
         File jbossHome = new File(System.getProperty("jboss.system.home"));
         dataDirFile = new File(jbossHome, dataDirectory);
  +      dataDirFile.mkdirs();
  +      if( !dataDirFile.isDirectory() )
  +         throw new Exception("The data directory is not valid: 
"+dataDirFile.getCanonicalPath());
   
         //Get an InitialContext
         JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[]{
  @@ -283,8 +287,9 @@
       * @param  txId                        Description of Parameter
       * @exception  javax.jms.JMSException  Description of Exception
       */
  -   public void add(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException
  +   public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException
      {
  +       SpyMessage message = messageRef.getMessage();
         //System.out.println("Add message "+Long.toHexString(message.messageId)+" in 
trans "+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination());
         LogInfo logInfo;
   
  @@ -319,7 +324,7 @@
         synchronized (logInfo)
         {
            logInfo.liveMessages++;
  -         message.persistData = logInfo;
  +         messageRef.persistData = logInfo;
            logInfo.log.add(message, txId);
         }
         if (txId != null)
  @@ -385,13 +390,14 @@
       * @param  txId                        Description of Parameter
       * @exception  javax.jms.JMSException  Description of Exception
       */
  -   public void remove(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) 
throws javax.jms.JMSException
  +   public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException
      {
         //System.out.println("Removing message 
"+Long.toHexString(message.messageId)+" in trans 
"+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination());
   
  +       SpyMessage message = messageRef.getMessage();
         LogInfo logInfo;
   
  -      SpyTxLog txLog = ((LogInfo)message.persistData).txLog;
  +      SpyTxLog txLog = ((LogInfo)messageRef.persistData).txLog;
         synchronized (messageLogs)
         {
            HashMap logs = (HashMap)messageLogs.get(txLog);
  @@ -412,7 +418,7 @@
            synchronized (transToTxLogs)
            {
               TxInfo txInfo = (TxInfo)transToTxLogs.get(txId);
  -            txInfo.ackMessages.add(message);
  +            txInfo.ackMessages.add(messageRef);
            }
         }
         if (txId == null)
  @@ -482,7 +488,7 @@
                  continue;
               String key = name.substring(0, name.length() - (sRollOver.length() + 
4));
               SpyMessageLog messageLog = new SpyMessageLog(dataFiles[i]);
  -            SpyMessage[] messages = messageLog.restore(commitedTxs);
  +            MessageReference[] messages = messageLog.restore(commitedTxs);
               SpyDestination dest = (SpyDestination)queues.get(key);
               if (dest != null)
               {
  @@ -499,7 +505,9 @@
                        messages[j].persistData = info;
                        if (dest instanceof org.jboss.mq.SpyTopic)
                        {
  -                        messages[j].durableSubscriberID = 
((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID();
  +                             SpyMessage mesg = messages[j].getMessage();
  +                        mesg.durableSubscriberID = 
((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID();
  +                        messages[j].invalidate(); // since we updated the message
                        }
                        q.restoreMessage(messages[j]);
                     }
  @@ -618,7 +626,7 @@
      {
         for (Iterator it = messages.iterator(); it.hasNext(); )
         {
  -         LogInfo info = ((LogInfo)((SpyMessage)it.next()).persistData);
  +         LogInfo info = ((LogInfo)((MessageReference)it.next()).persistData);
            synchronized (info)
            {
               --info.liveMessages;
  
  
  
  1.4       +14 -7     
jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyMessageLog.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyMessageLog.java        2001/09/04 02:22:29     1.3
  +++ SpyMessageLog.java        2001/10/28 01:27:01     1.4
  @@ -11,6 +11,7 @@
   import java.io.File;
   import javax.jms.JMSException;
   import org.jboss.mq.SpyJMSException;
  +import org.jboss.mq.server.MessageReference;
   
   import org.jboss.mq.SpyMessage;
   
  @@ -21,7 +22,7 @@
    *
    * @created    August 16, 2001
    * @author:    Hiram Chirino ([EMAIL PROTECTED])
  - * @version    $Revision: 1.3 $
  + * @version    $Revision: 1.4 $
    */
   public class SpyMessageLog {
   
  @@ -65,11 +66,12 @@
      }
   
   
  -   public synchronized SpyMessage[] restore( java.util.TreeSet commited )
  +   public synchronized MessageReference[] restore( java.util.TreeSet commited )
         throws JMSException {
   
         java.util.HashMap messageIndex = new java.util.HashMap();
  -
  +      org.jboss.mq.server.MessageCache cache = 
org.jboss.mq.server.JMSServer.getInstance().getMessageCache();
  +      
         try {
            java.util.LinkedList objects = transactionLog.toIndex();
   
  @@ -87,7 +89,8 @@
                     continue;
                  }
   
  -               messageIndex.put( new Long( r.messageId ), o );
  +               MessageReference mr = cache.add(r.message);
  +               messageIndex.put( new Long( r.messageId ), mr );
   
               } else if ( o instanceof IntegrityLog.MessageRemovedRecord ) {
   
  @@ -99,7 +102,11 @@
                     continue;
                  }
   
  -               messageIndex.remove( new Long( r.messageId ) );
  +               Long txid = new Long( r.messageId );  
  +               MessageReference mr = (MessageReference)messageIndex.get(txid );
  +               messageIndex.remove(txid );
  +               if( mr != null )
  +                  cache.remove(mr);   
   
               }
            }
  @@ -108,10 +115,10 @@
            throwJMSException( "Could not rebuild the queue from the queue's 
tranaction log.", e );
         }
   
  -      SpyMessage rc[] = new SpyMessage[messageIndex.size()];
  +      MessageReference rc[] = new MessageReference[messageIndex.size()];
         java.util.Iterator iter = messageIndex.values().iterator();
         for ( int i = 0; iter.hasNext(); i++ ) {
  -         rc[i] = ( ( IntegrityLog.MessageAddedRecord )iter.next() ).message;
  +         rc[i] = (MessageReference)iter.next();
         }
         return rc;
      }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to