User: chirino 
  Date: 01/10/27 21:07:36

  Modified:    src/main/org/jboss/mq/server BasicQueue.java JMSQueue.java
                        JMSTopic.java MessageCache.java
                        MessageReference.java
  Log:
  Reorganized the JMS message headers into a Header object so that the MessageCahe can
  drop the message body from memory, but keep the headers.
  Also fixed a bug witht he Browse() that was introduced with the message cache.
  
  Revision  Changes    Path
  1.8       +8 -6      jbossmq/src/main/org/jboss/mq/server/BasicQueue.java
  
  Index: BasicQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- BasicQueue.java   2001/10/28 01:27:01     1.7
  +++ BasicQueue.java   2001/10/28 04:07:35     1.8
  @@ -31,7 +31,7 @@
    * @author     Norbert Lataille ([EMAIL PROTECTED])
    * @author     David Maplesden ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.7 $
  + * @version    $Revision: 1.8 $
    */
   //abstract public class BasicQueue implements Runnable {
   public class BasicQueue {
  @@ -78,7 +78,9 @@
            SpyMessage list[];
            synchronized (messages) {
               list = new SpyMessage[messages.size()];
  -            list = (SpyMessage[]) messages.toArray(list);
  +            Iterator iter = messages.iterator();
  +            for( int i=0; iter.hasNext(); i++ )
  +             list[i] = ((MessageReference)iter.next()).getMessage();
            }
            return list;
         } else {
  @@ -88,9 +90,9 @@
            synchronized (messages) {
               Iterator i = messages.iterator();
               while (i.hasNext()) {
  -               SpyMessage m = (SpyMessage) i.next();
  -               if (s.test(m)) {
  -                  selection.add(m);
  +               MessageReference m = (MessageReference) i.next();
  +               if (s.test(m.getHeaders())) {
  +                  selection.add(m.getMessage());
                  }
               }
            }
  @@ -249,7 +251,7 @@
   
         } else {
   
  -         if (this instanceof PersistentQueue && m.getHeaders().getJMSDeliveryMode() 
== DeliveryMode.PERSISTENT) {
  +         if (this instanceof PersistentQueue && m.getHeaders().jmsDeliveryMode == 
DeliveryMode.PERSISTENT) {
               server.getPersistenceManager().remove(m, txId);
            }
   
  
  
  
  1.6       +3 -3      jbossmq/src/main/org/jboss/mq/server/JMSQueue.java
  
  Index: JMSQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- JMSQueue.java     2001/10/28 01:27:01     1.5
  +++ JMSQueue.java     2001/10/28 04:07:36     1.6
  @@ -26,7 +26,7 @@
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @author     David Maplesden ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.5 $
  + * @version    $Revision: 1.6 $
    */
   public class JMSQueue extends JMSDestination {
   
  @@ -68,7 +68,7 @@
          try {
                  SpyMessage spyMessage = messageRef.getMessage();
              synchronized ( this ) {
  -              messageIdCounter = Math.max( messageIdCounter, spyMessage.messageId + 
1 );
  +              messageIdCounter = Math.max( messageIdCounter, 
spyMessage.header.messageId + 1 );
              }
              queue.restoreMessage( messageRef);
          } catch ( JMSException e ) {
  @@ -101,7 +101,7 @@
   
         //Number the message so that we can preserve order of delivery.
         synchronized ( this ) {
  -              mes.messageId = messageIdCounter++;
  +              mes.header.messageId = messageIdCounter++;
                 MessageReference message = server.getMessageCache().add(mes);
            queue.addMessage( message, txId );
         }
  
  
  
  1.7       +7 -7      jbossmq/src/main/org/jboss/mq/server/JMSTopic.java
  
  Index: JMSTopic.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSTopic.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- JMSTopic.java     2001/10/28 01:27:01     1.6
  +++ JMSTopic.java     2001/10/28 04:07:36     1.7
  @@ -26,7 +26,7 @@
    * @author     Hiram Chirino ([EMAIL PROTECTED])
    * @author     David Maplesden ([EMAIL PROTECTED])
    * @created    August 16, 2001
  - * @version    $Revision: 1.6 $
  + * @version    $Revision: 1.7 $
    */
   public class JMSTopic extends JMSDestination {
   
  @@ -110,12 +110,12 @@
         try {
            SpyMessage spyMessage = messageRef.getMessage();
            synchronized (this) {
  -            messageIdCounter = Math.max(messageIdCounter, spyMessage.messageId + 1);
  +            messageIdCounter = Math.max(messageIdCounter, 
spyMessage.header.messageId + 1);
            }
  -         if (spyMessage.durableSubscriberID == null) {
  +         if (spyMessage.header.durableSubscriberID == null) {
               cat.debug("Trying to restore message with null durableSubscriberID");
            } else {
  -            ((BasicQueue) 
durQueues.get(spyMessage.durableSubscriberID)).restoreMessage(messageRef);
  +            ((BasicQueue) 
durQueues.get(spyMessage.header.durableSubscriberID)).restoreMessage(messageRef);
            }
         } catch (JMSException e) {
            cat.error("Could not restore message:", e);
  @@ -169,8 +169,8 @@
                  DurableSubcriptionID id = (DurableSubcriptionID) iter.next();
                  PersistentQueue q = (PersistentQueue) durQueues.get(id);
                  SpyMessage clone = message.myClone();
  -               clone.durableSubscriberID = id;
  -               clone.messageId = messageId;
  +               clone.header.durableSubscriberID = id;
  +               clone.header.messageId = messageId;
                  MessageReference ref = server.getMessageCache().add(clone);
                  q.addMessage(ref, txId);
               }
  @@ -180,7 +180,7 @@
               while (iter.hasNext()) {
                  BasicQueue q = (BasicQueue) iter.next();
                  SpyMessage clone = message.myClone();
  -               clone.messageId = messageId;
  +               clone.header.messageId = messageId;
                  MessageReference ref = server.getMessageCache().add(clone);
                  q.addMessage(ref, txId);
               }
  
  
  
  1.2       +70 -13    jbossmq/src/main/org/jboss/mq/server/MessageCache.java
  
  Index: MessageCache.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageCache.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MessageCache.java 2001/10/28 01:27:01     1.1
  +++ MessageCache.java 2001/10/28 04:07:36     1.2
  @@ -23,7 +23,7 @@
    * later.
    *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a>
  - * @version    $Revision: 1.1 $
  + * @version    $Revision: 1.2 $
    */
   public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, 
MBeanRegistration, Runnable {
   
  @@ -76,7 +76,7 @@
            mr.clear();
            lruCache.remove(mr);
            totalCacheSize--;
  -      cat.debug("remove lock release");
  +         cat.debug("remove lock release");
         }
      }
   
  @@ -152,7 +152,7 @@
                  iter.remove();
               }
            }
  -      cat.debug("run lock release");
  +         cat.debug("run lock release");
         }
   
      }
  @@ -164,10 +164,10 @@
      synchronized public void messageReferenceUsedEvent(MessageReference mh, boolean 
wasHard) {
         cat.debug("messageReferenceUsedEvent lock aquire");
         synchronized (this) {
  -           if (wasHard)
  -              lruCache.remove(mh);
  -           lruCache.addLast(mh);
  -      cat.debug("messageReferenceUsedEvent lock released");
  +         if (wasHard)
  +            lruCache.remove(mh);
  +         lruCache.addLast(mh);
  +         cat.debug("messageReferenceUsedEvent lock released");
         }
      }
   
  @@ -221,9 +221,7 @@
            files[i].delete();
         }
   
  -      JMSServer server = (JMSServer) getServer().invoke(new 
ObjectName(JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {
  -      }, new String[] {
  -      });
  +      JMSServer server = (JMSServer) getServer().invoke(new 
ObjectName(JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new 
String[] {});
         server.setMessageCache(this);
      }
   
  @@ -329,13 +327,13 @@
      public long getMaxMemoryMark() {
         return maxMemoryMark / ONE_MEGABYTE;
      }
  -   
  +
      /**
       * Gets the CurrentMemoryUsage
       * @return Returns a long
       */
  -   public long getCurrentMemoryUsage(){
  -       return (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory())/ ONE_MEGABYTE;
  +   public long getCurrentMemoryUsage() {
  +      return (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE;
      }
   
      /**
  @@ -351,6 +349,65 @@
       */
      public String getName() {
         return "MessageCache";
  +   }
  +   
  +   /**
  +    * This test creates 5000 x 100K messages and places them 
  +    * in the MessageCache.  With out a cache this would be
  +    * 500 Megs of memory needed.  The cache will allow us to 
  +    * stay withing 64 Megs of RAM.
  +    */
  +   public void testBigLoad() throws Exception {
  +
  +      MessageCache cache = new MessageCache();
  +      File tempDir = new File("Temp-" + System.currentTimeMillis());
  +      tempDir.mkdirs();
  +      cache.setDataDirectory(tempDir.getCanonicalPath());
  +      cache.setHighMemoryMark(40);
  +      cache.setMaxMemoryMark(60);
  +      cache.init();
  +      cache.start();
  +
  +      LinkedList ll = new LinkedList();
  +
  +      int TEST_SIZE = 5000;
  +      // Create a whole bunch of messages.
  +      java.util.Random rand = new java.util.Random(System.currentTimeMillis());
  +      cat.info("Adding the messages");
  +      for (int i = 0; i < TEST_SIZE; i++) {
  +         //cat.info("Used 
Mem="+(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
  +         org.jboss.mq.SpyBytesMessage bm = new org.jboss.mq.SpyBytesMessage();
  +         bm.writeBytes(new byte[1024 * 100]); // 100K messages
  +         MessageReference mr = cache.add(bm);
  +         ll.add(mr);
  +
  +         // Randomly pickout messages out of the cache..
  +         int pick = rand.nextInt(i + 1);
  +         mr = (MessageReference) ll.get(pick);
  +         mr.getMessage();
  +      }
  +
  +      cat.info("Used Mem=" + (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()));
  +      //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() == 
TEST_SIZE);
  +      cat.info("Messages with Hard Refs=" + cache.getHardRefCacheSize());
  +      cat.info("Messages with Soft Refs=" + cache.getSoftRefCacheSize());
  +
  +      cat.info("Removing the messages");
  +      Iterator iter = ll.iterator();
  +      for (int i = 0; i < TEST_SIZE; i++) {
  +         MessageReference mr = (MessageReference) iter.next();
  +         iter.remove();
  +         cache.remove(mr);
  +      }
  +
  +      cat.info("Stopping");
  +      //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() == 0);
  +      cache.stop();
  +      //assertTrue("Data directory clean up check", tempDir.listFiles().length == 
0);
  +      tempDir.delete();
  +
  +      cat.info("Cache Hits=" + cache.getCacheHits());
  +      cat.info("Cache Misses=" + cache.getCacheMisses());
      }
   
   }
  
  
  
  1.2       +4 -4      jbossmq/src/main/org/jboss/mq/server/MessageReference.java
  
  Index: MessageReference.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageReference.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MessageReference.java     2001/10/28 01:27:01     1.1
  +++ MessageReference.java     2001/10/28 04:07:36     1.2
  @@ -19,7 +19,7 @@
    * </ul>
    * 
    * @author <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a>
  - * @version    $Revision: 1.1 $
  + * @version    $Revision: 1.2 $
    */
   public class MessageReference implements Comparable {
      org.apache.log4j.Category cat = 
org.apache.log4j.Category.getInstance(MessageReference.class);
  @@ -50,7 +50,7 @@
         this.hardReference = message;
         this.referenceId = referenceId;
         this.jmsPriority = (byte) message.getJMSPriority();
  -      this.messageId = message.messageId;
  +      this.messageId = message.header.messageId;
      }
   
      public SpyMessage getMessage() throws JMSException {
  @@ -76,8 +76,8 @@
       * 
       * For now just return the message.
       */
  -   public SpyMessage getHeaders() throws javax.jms.JMSException {
  -      return getMessage();
  +   public SpyMessage.Header getHeaders() throws javax.jms.JMSException {
  +      return getMessage().header;
      }
   
      void clear() throws SpyJMSException {
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to