User: chirino
Date: 01/10/27 21:07:36
Modified: src/main/org/jboss/mq/server BasicQueue.java JMSQueue.java
JMSTopic.java MessageCache.java
MessageReference.java
Log:
Reorganized the JMS message headers into a Header object so that the MessageCahe can
drop the message body from memory, but keep the headers.
Also fixed a bug witht he Browse() that was introduced with the message cache.
Revision Changes Path
1.8 +8 -6 jbossmq/src/main/org/jboss/mq/server/BasicQueue.java
Index: BasicQueue.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/BasicQueue.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- BasicQueue.java 2001/10/28 01:27:01 1.7
+++ BasicQueue.java 2001/10/28 04:07:35 1.8
@@ -31,7 +31,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author David Maplesden ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
//abstract public class BasicQueue implements Runnable {
public class BasicQueue {
@@ -78,7 +78,9 @@
SpyMessage list[];
synchronized (messages) {
list = new SpyMessage[messages.size()];
- list = (SpyMessage[]) messages.toArray(list);
+ Iterator iter = messages.iterator();
+ for( int i=0; iter.hasNext(); i++ )
+ list[i] = ((MessageReference)iter.next()).getMessage();
}
return list;
} else {
@@ -88,9 +90,9 @@
synchronized (messages) {
Iterator i = messages.iterator();
while (i.hasNext()) {
- SpyMessage m = (SpyMessage) i.next();
- if (s.test(m)) {
- selection.add(m);
+ MessageReference m = (MessageReference) i.next();
+ if (s.test(m.getHeaders())) {
+ selection.add(m.getMessage());
}
}
}
@@ -249,7 +251,7 @@
} else {
- if (this instanceof PersistentQueue && m.getHeaders().getJMSDeliveryMode()
== DeliveryMode.PERSISTENT) {
+ if (this instanceof PersistentQueue && m.getHeaders().jmsDeliveryMode ==
DeliveryMode.PERSISTENT) {
server.getPersistenceManager().remove(m, txId);
}
1.6 +3 -3 jbossmq/src/main/org/jboss/mq/server/JMSQueue.java
Index: JMSQueue.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSQueue.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- JMSQueue.java 2001/10/28 01:27:01 1.5
+++ JMSQueue.java 2001/10/28 04:07:36 1.6
@@ -26,7 +26,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author David Maplesden ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class JMSQueue extends JMSDestination {
@@ -68,7 +68,7 @@
try {
SpyMessage spyMessage = messageRef.getMessage();
synchronized ( this ) {
- messageIdCounter = Math.max( messageIdCounter, spyMessage.messageId +
1 );
+ messageIdCounter = Math.max( messageIdCounter,
spyMessage.header.messageId + 1 );
}
queue.restoreMessage( messageRef);
} catch ( JMSException e ) {
@@ -101,7 +101,7 @@
//Number the message so that we can preserve order of delivery.
synchronized ( this ) {
- mes.messageId = messageIdCounter++;
+ mes.header.messageId = messageIdCounter++;
MessageReference message = server.getMessageCache().add(mes);
queue.addMessage( message, txId );
}
1.7 +7 -7 jbossmq/src/main/org/jboss/mq/server/JMSTopic.java
Index: JMSTopic.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/JMSTopic.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- JMSTopic.java 2001/10/28 01:27:01 1.6
+++ JMSTopic.java 2001/10/28 04:07:36 1.7
@@ -26,7 +26,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author David Maplesden ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class JMSTopic extends JMSDestination {
@@ -110,12 +110,12 @@
try {
SpyMessage spyMessage = messageRef.getMessage();
synchronized (this) {
- messageIdCounter = Math.max(messageIdCounter, spyMessage.messageId + 1);
+ messageIdCounter = Math.max(messageIdCounter,
spyMessage.header.messageId + 1);
}
- if (spyMessage.durableSubscriberID == null) {
+ if (spyMessage.header.durableSubscriberID == null) {
cat.debug("Trying to restore message with null durableSubscriberID");
} else {
- ((BasicQueue)
durQueues.get(spyMessage.durableSubscriberID)).restoreMessage(messageRef);
+ ((BasicQueue)
durQueues.get(spyMessage.header.durableSubscriberID)).restoreMessage(messageRef);
}
} catch (JMSException e) {
cat.error("Could not restore message:", e);
@@ -169,8 +169,8 @@
DurableSubcriptionID id = (DurableSubcriptionID) iter.next();
PersistentQueue q = (PersistentQueue) durQueues.get(id);
SpyMessage clone = message.myClone();
- clone.durableSubscriberID = id;
- clone.messageId = messageId;
+ clone.header.durableSubscriberID = id;
+ clone.header.messageId = messageId;
MessageReference ref = server.getMessageCache().add(clone);
q.addMessage(ref, txId);
}
@@ -180,7 +180,7 @@
while (iter.hasNext()) {
BasicQueue q = (BasicQueue) iter.next();
SpyMessage clone = message.myClone();
- clone.messageId = messageId;
+ clone.header.messageId = messageId;
MessageReference ref = server.getMessageCache().add(clone);
q.addMessage(ref, txId);
}
1.2 +70 -13 jbossmq/src/main/org/jboss/mq/server/MessageCache.java
Index: MessageCache.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageCache.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MessageCache.java 2001/10/28 01:27:01 1.1
+++ MessageCache.java 2001/10/28 04:07:36 1.2
@@ -23,7 +23,7 @@
* later.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a>
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean,
MBeanRegistration, Runnable {
@@ -76,7 +76,7 @@
mr.clear();
lruCache.remove(mr);
totalCacheSize--;
- cat.debug("remove lock release");
+ cat.debug("remove lock release");
}
}
@@ -152,7 +152,7 @@
iter.remove();
}
}
- cat.debug("run lock release");
+ cat.debug("run lock release");
}
}
@@ -164,10 +164,10 @@
synchronized public void messageReferenceUsedEvent(MessageReference mh, boolean
wasHard) {
cat.debug("messageReferenceUsedEvent lock aquire");
synchronized (this) {
- if (wasHard)
- lruCache.remove(mh);
- lruCache.addLast(mh);
- cat.debug("messageReferenceUsedEvent lock released");
+ if (wasHard)
+ lruCache.remove(mh);
+ lruCache.addLast(mh);
+ cat.debug("messageReferenceUsedEvent lock released");
}
}
@@ -221,9 +221,7 @@
files[i].delete();
}
- JMSServer server = (JMSServer) getServer().invoke(new
ObjectName(JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {
- }, new String[] {
- });
+ JMSServer server = (JMSServer) getServer().invoke(new
ObjectName(JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] {}, new
String[] {});
server.setMessageCache(this);
}
@@ -329,13 +327,13 @@
public long getMaxMemoryMark() {
return maxMemoryMark / ONE_MEGABYTE;
}
-
+
/**
* Gets the CurrentMemoryUsage
* @return Returns a long
*/
- public long getCurrentMemoryUsage(){
- return (Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory())/ ONE_MEGABYTE;
+ public long getCurrentMemoryUsage() {
+ return (Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE;
}
/**
@@ -351,6 +349,65 @@
*/
public String getName() {
return "MessageCache";
+ }
+
+ /**
+ * This test creates 5000 x 100K messages and places them
+ * in the MessageCache. With out a cache this would be
+ * 500 Megs of memory needed. The cache will allow us to
+ * stay withing 64 Megs of RAM.
+ */
+ public void testBigLoad() throws Exception {
+
+ MessageCache cache = new MessageCache();
+ File tempDir = new File("Temp-" + System.currentTimeMillis());
+ tempDir.mkdirs();
+ cache.setDataDirectory(tempDir.getCanonicalPath());
+ cache.setHighMemoryMark(40);
+ cache.setMaxMemoryMark(60);
+ cache.init();
+ cache.start();
+
+ LinkedList ll = new LinkedList();
+
+ int TEST_SIZE = 5000;
+ // Create a whole bunch of messages.
+ java.util.Random rand = new java.util.Random(System.currentTimeMillis());
+ cat.info("Adding the messages");
+ for (int i = 0; i < TEST_SIZE; i++) {
+ //cat.info("Used
Mem="+(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
+ org.jboss.mq.SpyBytesMessage bm = new org.jboss.mq.SpyBytesMessage();
+ bm.writeBytes(new byte[1024 * 100]); // 100K messages
+ MessageReference mr = cache.add(bm);
+ ll.add(mr);
+
+ // Randomly pickout messages out of the cache..
+ int pick = rand.nextInt(i + 1);
+ mr = (MessageReference) ll.get(pick);
+ mr.getMessage();
+ }
+
+ cat.info("Used Mem=" + (Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()));
+ //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() ==
TEST_SIZE);
+ cat.info("Messages with Hard Refs=" + cache.getHardRefCacheSize());
+ cat.info("Messages with Soft Refs=" + cache.getSoftRefCacheSize());
+
+ cat.info("Removing the messages");
+ Iterator iter = ll.iterator();
+ for (int i = 0; i < TEST_SIZE; i++) {
+ MessageReference mr = (MessageReference) iter.next();
+ iter.remove();
+ cache.remove(mr);
+ }
+
+ cat.info("Stopping");
+ //assertTrue("getTotalCacheSize check", cache.getTotalCacheSize() == 0);
+ cache.stop();
+ //assertTrue("Data directory clean up check", tempDir.listFiles().length ==
0);
+ tempDir.delete();
+
+ cat.info("Cache Hits=" + cache.getCacheHits());
+ cat.info("Cache Misses=" + cache.getCacheMisses());
}
}
1.2 +4 -4 jbossmq/src/main/org/jboss/mq/server/MessageReference.java
Index: MessageReference.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageReference.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MessageReference.java 2001/10/28 01:27:01 1.1
+++ MessageReference.java 2001/10/28 04:07:36 1.2
@@ -19,7 +19,7 @@
* </ul>
*
* @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a>
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class MessageReference implements Comparable {
org.apache.log4j.Category cat =
org.apache.log4j.Category.getInstance(MessageReference.class);
@@ -50,7 +50,7 @@
this.hardReference = message;
this.referenceId = referenceId;
this.jmsPriority = (byte) message.getJMSPriority();
- this.messageId = message.messageId;
+ this.messageId = message.header.messageId;
}
public SpyMessage getMessage() throws JMSException {
@@ -76,8 +76,8 @@
*
* For now just return the message.
*/
- public SpyMessage getHeaders() throws javax.jms.JMSException {
- return getMessage();
+ public SpyMessage.Header getHeaders() throws javax.jms.JMSException {
+ return getMessage().header;
}
void clear() throws SpyJMSException {
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development