User: chirino
Date: 01/10/27 18:27:01
Modified: src/main/org/jboss/mq/pm/rollinglogged
PersistenceManager.java SpyMessageLog.java
Log:
Commiting my initial implementation of a message cache for the JBossMQ messages.
This should allow the server to scale so it can hold a larger number of message.
Revision Changes Path
1.10 +17 -9
jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- PersistenceManager.java 2001/10/11 02:19:03 1.9
+++ PersistenceManager.java 2001/10/28 01:27:01 1.10
@@ -27,12 +27,13 @@
import org.jboss.mq.server.JMSServer;
import org.jboss.mq.xml.XElement;
import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.mq.server.MessageReference;
/**
* This class manages all persistence related services.
*
* @author David Maplesden ([EMAIL PROTECTED])
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class PersistenceManager extends ServiceMBeanSupport implements
org.jboss.mq.pm.PersistenceManager, PersistenceManagerMBean
{
@@ -252,6 +253,9 @@
File jbossHome = new File(System.getProperty("jboss.system.home"));
dataDirFile = new File(jbossHome, dataDirectory);
+ dataDirFile.mkdirs();
+ if( !dataDirFile.isDirectory() )
+ throw new Exception("The data directory is not valid:
"+dataDirFile.getCanonicalPath());
//Get an InitialContext
JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[]{
@@ -283,8 +287,9 @@
* @param txId Description of Parameter
* @exception javax.jms.JMSException Description of Exception
*/
- public void add(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException
+ public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException
{
+ SpyMessage message = messageRef.getMessage();
//System.out.println("Add message "+Long.toHexString(message.messageId)+" in
trans "+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination());
LogInfo logInfo;
@@ -319,7 +324,7 @@
synchronized (logInfo)
{
logInfo.liveMessages++;
- message.persistData = logInfo;
+ messageRef.persistData = logInfo;
logInfo.log.add(message, txId);
}
if (txId != null)
@@ -385,13 +390,14 @@
* @param txId Description of Parameter
* @exception javax.jms.JMSException Description of Exception
*/
- public void remove(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId)
throws javax.jms.JMSException
+ public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException
{
//System.out.println("Removing message
"+Long.toHexString(message.messageId)+" in trans
"+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination());
+ SpyMessage message = messageRef.getMessage();
LogInfo logInfo;
- SpyTxLog txLog = ((LogInfo)message.persistData).txLog;
+ SpyTxLog txLog = ((LogInfo)messageRef.persistData).txLog;
synchronized (messageLogs)
{
HashMap logs = (HashMap)messageLogs.get(txLog);
@@ -412,7 +418,7 @@
synchronized (transToTxLogs)
{
TxInfo txInfo = (TxInfo)transToTxLogs.get(txId);
- txInfo.ackMessages.add(message);
+ txInfo.ackMessages.add(messageRef);
}
}
if (txId == null)
@@ -482,7 +488,7 @@
continue;
String key = name.substring(0, name.length() - (sRollOver.length() +
4));
SpyMessageLog messageLog = new SpyMessageLog(dataFiles[i]);
- SpyMessage[] messages = messageLog.restore(commitedTxs);
+ MessageReference[] messages = messageLog.restore(commitedTxs);
SpyDestination dest = (SpyDestination)queues.get(key);
if (dest != null)
{
@@ -499,7 +505,9 @@
messages[j].persistData = info;
if (dest instanceof org.jboss.mq.SpyTopic)
{
- messages[j].durableSubscriberID =
((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID();
+ SpyMessage mesg = messages[j].getMessage();
+ mesg.durableSubscriberID =
((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID();
+ messages[j].invalidate(); // since we updated the message
}
q.restoreMessage(messages[j]);
}
@@ -618,7 +626,7 @@
{
for (Iterator it = messages.iterator(); it.hasNext(); )
{
- LogInfo info = ((LogInfo)((SpyMessage)it.next()).persistData);
+ LogInfo info = ((LogInfo)((MessageReference)it.next()).persistData);
synchronized (info)
{
--info.liveMessages;
1.4 +14 -7
jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyMessageLog.java
Index: SpyMessageLog.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyMessageLog.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyMessageLog.java 2001/09/04 02:22:29 1.3
+++ SpyMessageLog.java 2001/10/28 01:27:01 1.4
@@ -11,6 +11,7 @@
import java.io.File;
import javax.jms.JMSException;
import org.jboss.mq.SpyJMSException;
+import org.jboss.mq.server.MessageReference;
import org.jboss.mq.SpyMessage;
@@ -21,7 +22,7 @@
*
* @created August 16, 2001
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyMessageLog {
@@ -65,11 +66,12 @@
}
- public synchronized SpyMessage[] restore( java.util.TreeSet commited )
+ public synchronized MessageReference[] restore( java.util.TreeSet commited )
throws JMSException {
java.util.HashMap messageIndex = new java.util.HashMap();
-
+ org.jboss.mq.server.MessageCache cache =
org.jboss.mq.server.JMSServer.getInstance().getMessageCache();
+
try {
java.util.LinkedList objects = transactionLog.toIndex();
@@ -87,7 +89,8 @@
continue;
}
- messageIndex.put( new Long( r.messageId ), o );
+ MessageReference mr = cache.add(r.message);
+ messageIndex.put( new Long( r.messageId ), mr );
} else if ( o instanceof IntegrityLog.MessageRemovedRecord ) {
@@ -99,7 +102,11 @@
continue;
}
- messageIndex.remove( new Long( r.messageId ) );
+ Long txid = new Long( r.messageId );
+ MessageReference mr = (MessageReference)messageIndex.get(txid );
+ messageIndex.remove(txid );
+ if( mr != null )
+ cache.remove(mr);
}
}
@@ -108,10 +115,10 @@
throwJMSException( "Could not rebuild the queue from the queue's
tranaction log.", e );
}
- SpyMessage rc[] = new SpyMessage[messageIndex.size()];
+ MessageReference rc[] = new MessageReference[messageIndex.size()];
java.util.Iterator iter = messageIndex.values().iterator();
for ( int i = 0; iter.hasNext(); i++ ) {
- rc[i] = ( ( IntegrityLog.MessageAddedRecord )iter.next() ).message;
+ rc[i] = (MessageReference)iter.next();
}
return rc;
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development