User: chirino 
  Date: 01/10/27 18:27:00

  Modified:    src/main/org/jboss/mq/pm/file MessageLog.java
                        PersistenceManager.java
  Log:
  Commiting my initial implementation of a message cache for the JBossMQ messages.  
This should allow the server to scale so it can hold a larger number of message.
  
  Revision  Changes    Path
  1.4       +19 -14    jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java
  
  Index: MessageLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MessageLog.java   2001/09/04 02:22:42     1.3
  +++ MessageLog.java   2001/10/28 01:27:00     1.4
  @@ -15,6 +15,8 @@
   import java.io.Serializable;
   
   import javax.jms.JMSException;
  +import org.jboss.mq.server.MessageReference ;
  +import org.jboss.mq.server.JMSServer;
   
   import org.jboss.mq.*;
   
  @@ -24,7 +26,7 @@
    *
    * @created    August 16, 2001
    * @author:    Paul Kendall ([EMAIL PROTECTED])
  - * @version    $Revision: 1.3 $
  + * @version    $Revision: 1.4 $
    */
   public class MessageLog {
   
  @@ -61,7 +63,7 @@
      }
   
   
  -   public SpyMessage[] restore( java.util.TreeSet rollBackTXs )
  +   public MessageReference[] restore( java.util.TreeSet rollBackTXs )
         throws JMSException {
         //use sorted map to get queue order right.
         java.util.TreeMap messageIndex = new java.util.TreeMap();
  @@ -88,17 +90,18 @@
            throwJMSException( "Could not rebuild the queue from the queue's 
tranaction log.", e );
         }
   
  -      SpyMessage rc[] = new SpyMessage[messageIndex.size()];
  +      MessageReference rc[] = new MessageReference[messageIndex.size()];
         java.util.Iterator iter = messageIndex.values().iterator();
         for ( int i = 0; iter.hasNext(); i++ ) {
  -         rc[i] = ( SpyMessage )iter.next();
  +         rc[i] = ( MessageReference )iter.next();
         }
         return rc;
      }
   
  -   public void add( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
  +   public void add( MessageReference messageRef, org.jboss.mq.pm.Tx transactionId )
         throws JMSException {
         try {
  +              SpyMessage message = messageRef.getMessage();
            File f;
            if ( transactionId == null ) {
               f = new File( queueName, message.getJMSMessageID() );
  @@ -106,20 +109,20 @@
               f = new File( queueName, message.getJMSMessageID() + "." + 
transactionId );
            }
            writeMessageToFile( message, f );
  -         message.persistData = f;
  +         messageRef.persistData = f;
         } catch ( IOException e ) {
            throwJMSException( "Could not write to the tranaction log.", e );
         }
      }
   
  -   public void finishAdd( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
  +   public void finishAdd( MessageReference message, org.jboss.mq.pm.Tx 
transactionId )
         throws JMSException {
      }
   
  -   public void finishRemove( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
  +   public void finishRemove( MessageReference messageRef, org.jboss.mq.pm.Tx 
transactionId )
         throws JMSException {
         try {
  -         File file = ( File )message.persistData;
  +         File file = ( File )messageRef.persistData;
            delete( file );
         } catch ( IOException e ) {
            throwJMSException( "Could not write to the tranaction log.", e );
  @@ -130,17 +133,17 @@
         throws JMSException {
      }
   
  -   public void undoAdd( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
  +   public void undoAdd( MessageReference messageRef, org.jboss.mq.pm.Tx 
transactionId )
         throws JMSException {
         try {
  -         File file = ( File )message.persistData;
  +         File file = ( File )messageRef.persistData;
            delete( file );
         } catch ( IOException e ) {
            throwJMSException( "Could not write to the tranaction log.", e );
         }
      }
   
  -   public void undoRemove( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
  +   public void undoRemove( MessageReference message, org.jboss.mq.pm.Tx 
transactionId )
         throws JMSException {
      }
   
  @@ -234,8 +237,10 @@
         message.readExternal( in );
         in.close();
         message.messageId = msgId;
  -      message.persistData = file;
  -      store.put( new Long( msgId ), message );
  +      
  +      MessageReference mr = JMSServer.getInstance().getMessageCache().add(message);
  +      mr.persistData = file;
  +      store.put( new Long( msgId ), mr );
      }
   
      private void throwJMSException( String message, Exception e )
  
  
  
  1.7       +27 -13    jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- PersistenceManager.java   2001/09/04 02:22:42     1.6
  +++ PersistenceManager.java   2001/10/28 01:27:00     1.7
  @@ -30,12 +30,14 @@
   import org.jboss.mq.server.JMSServer;
   import org.jboss.system.ServiceMBeanSupport;
   
  +import org.jboss.mq.SpyMessage;
  +import org.jboss.mq.server.MessageReference;
   /**
    *  This class manages all persistence related services for file based
    *  persistence.
    *
    * @author     Paul Kendall ([EMAIL PROTECTED])
  - * @version    $Revision: 1.6 $
  + * @version    $Revision: 1.7 $
    */
   public class PersistenceManager extends ServiceMBeanSupport implements 
PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager
   {
  @@ -115,6 +117,9 @@
      {
         File jbossHome = new File(System.getProperty("jboss.system.home"));
         dataDirFile = new File(jbossHome, dataDirectory);
  +      dataDirFile.mkdirs();
  +      if( !dataDirFile.isDirectory() )
  +         throw new Exception("The data directory is not valid: 
"+dataDirFile.getCanonicalPath());
         JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[]{}, new String[]{});
         server.setPersistenceManager(this);
      }
  @@ -145,6 +150,12 @@
         {
            for (int i = 0; i < transactFiles.length; i++)
            {
  +             // Ignore the queue data directories.
  +             if( transactFiles[i].isDirectory() ) {
  +                transactFiles[i] = null;
  +                     continue;
  +             }
  +             
               try
               {
                  Long tx = new Long(Long.parseLong(transactFiles[i].getName()));
  @@ -183,7 +194,7 @@
         {
            LogInfo logInfo = (LogInfo)iter.next();
            JMSDestination q = server.getJMSDestination(logInfo.destination);
  -         SpyMessage rebuild[] = logInfo.log.restore(txs);
  +         MessageReference rebuild[] = logInfo.log.restore(txs);
            //TODO: make sure this lock is good enough
            synchronized (q)
            {
  @@ -191,7 +202,9 @@
               {
                  if (logInfo.destination instanceof org.jboss.mq.SpyTopic)
                  {
  -                  rebuild[i].durableSubscriberID = 
((org.jboss.mq.SpyTopic)logInfo.destination).getDurableSubscriptionID();
  +                       SpyMessage m = rebuild[i].getMessage();
  +                  m.durableSubscriberID = 
((org.jboss.mq.SpyTopic)logInfo.destination).getDurableSubscriptionID();
  +                  rebuild[i].invalidate(); // since we did an update.
                  }
                  q.restoreMessage(rebuild[i]);
               }
  @@ -283,8 +296,9 @@
       * @param  txId                        Description of Parameter
       * @exception  javax.jms.JMSException  Description of Exception
       */
  -   public void add(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException
  +   public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException
      {
  +       SpyMessage message = messageRef.getMessage();
         LogInfo logInfo;
         synchronized (messageLogs)
         {
  @@ -294,10 +308,10 @@
         {
            throw new javax.jms.JMSException("Destination was not initalized with the 
PersistenceManager");
         }
  -      logInfo.log.add(message, txId);
  +      logInfo.log.add(messageRef, txId);
         if (txId == null)
         {
  -         logInfo.log.finishAdd(message, txId);
  +         logInfo.log.finishAdd(messageRef, txId);
         }
         else
         {
  @@ -312,7 +326,7 @@
            }
            synchronized (info.tasks)
            {
  -            info.tasks.addLast(new Transaction(true, logInfo, message, txId));
  +            info.tasks.addLast(new Transaction(true, logInfo, messageRef, txId));
            }
         }
      }
  @@ -378,9 +392,9 @@
       * @param  txId                        Description of Parameter
       * @exception  javax.jms.JMSException  Description of Exception
       */
  -   public void remove(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) 
throws javax.jms.JMSException
  +   public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException
      {
  -
  +      SpyMessage message = messageRef.getMessage();   
         LogInfo logInfo;
   
         synchronized (messageLogs)
  @@ -396,7 +410,7 @@
         logInfo.log.remove(message, txId);
         if (txId == null)
         {
  -         logInfo.log.finishRemove(message, txId);
  +         logInfo.log.finishRemove(messageRef, txId);
         }
         else
         {
  @@ -421,7 +435,7 @@
            }
            synchronized (info.tasks)
            {
  -            info.tasks.addLast(new Transaction(false, logInfo, message, txId));
  +            info.tasks.addLast(new Transaction(false, logInfo, messageRef, txId));
            }
         }
   
  @@ -665,7 +679,7 @@
      class Transaction
      {
         private LogInfo logInfo;
  -      private SpyMessage message;
  +      private MessageReference message;
         private org.jboss.mq.pm.Tx txId;
         private boolean add;
   
  @@ -677,7 +691,7 @@
          * @param  message  Description of Parameter
          * @param  txId     Description of Parameter
          */
  -      public Transaction(boolean add, LogInfo logInfo, SpyMessage message, 
org.jboss.mq.pm.Tx txId)
  +      public Transaction(boolean add, LogInfo logInfo, MessageReference message, 
org.jboss.mq.pm.Tx txId)
         {
            this.add = add;
            this.logInfo = logInfo;
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to