User: chirino
Date: 01/10/27 18:27:00
Modified: src/main/org/jboss/mq/pm/file MessageLog.java
PersistenceManager.java
Log:
Commiting my initial implementation of a message cache for the JBossMQ messages.
This should allow the server to scale so it can hold a larger number of message.
Revision Changes Path
1.4 +19 -14 jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java
Index: MessageLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- MessageLog.java 2001/09/04 02:22:42 1.3
+++ MessageLog.java 2001/10/28 01:27:00 1.4
@@ -15,6 +15,8 @@
import java.io.Serializable;
import javax.jms.JMSException;
+import org.jboss.mq.server.MessageReference ;
+import org.jboss.mq.server.JMSServer;
import org.jboss.mq.*;
@@ -24,7 +26,7 @@
*
* @created August 16, 2001
* @author: Paul Kendall ([EMAIL PROTECTED])
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class MessageLog {
@@ -61,7 +63,7 @@
}
- public SpyMessage[] restore( java.util.TreeSet rollBackTXs )
+ public MessageReference[] restore( java.util.TreeSet rollBackTXs )
throws JMSException {
//use sorted map to get queue order right.
java.util.TreeMap messageIndex = new java.util.TreeMap();
@@ -88,17 +90,18 @@
throwJMSException( "Could not rebuild the queue from the queue's
tranaction log.", e );
}
- SpyMessage rc[] = new SpyMessage[messageIndex.size()];
+ MessageReference rc[] = new MessageReference[messageIndex.size()];
java.util.Iterator iter = messageIndex.values().iterator();
for ( int i = 0; iter.hasNext(); i++ ) {
- rc[i] = ( SpyMessage )iter.next();
+ rc[i] = ( MessageReference )iter.next();
}
return rc;
}
- public void add( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
+ public void add( MessageReference messageRef, org.jboss.mq.pm.Tx transactionId )
throws JMSException {
try {
+ SpyMessage message = messageRef.getMessage();
File f;
if ( transactionId == null ) {
f = new File( queueName, message.getJMSMessageID() );
@@ -106,20 +109,20 @@
f = new File( queueName, message.getJMSMessageID() + "." +
transactionId );
}
writeMessageToFile( message, f );
- message.persistData = f;
+ messageRef.persistData = f;
} catch ( IOException e ) {
throwJMSException( "Could not write to the tranaction log.", e );
}
}
- public void finishAdd( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
+ public void finishAdd( MessageReference message, org.jboss.mq.pm.Tx
transactionId )
throws JMSException {
}
- public void finishRemove( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
+ public void finishRemove( MessageReference messageRef, org.jboss.mq.pm.Tx
transactionId )
throws JMSException {
try {
- File file = ( File )message.persistData;
+ File file = ( File )messageRef.persistData;
delete( file );
} catch ( IOException e ) {
throwJMSException( "Could not write to the tranaction log.", e );
@@ -130,17 +133,17 @@
throws JMSException {
}
- public void undoAdd( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
+ public void undoAdd( MessageReference messageRef, org.jboss.mq.pm.Tx
transactionId )
throws JMSException {
try {
- File file = ( File )message.persistData;
+ File file = ( File )messageRef.persistData;
delete( file );
} catch ( IOException e ) {
throwJMSException( "Could not write to the tranaction log.", e );
}
}
- public void undoRemove( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
+ public void undoRemove( MessageReference message, org.jboss.mq.pm.Tx
transactionId )
throws JMSException {
}
@@ -234,8 +237,10 @@
message.readExternal( in );
in.close();
message.messageId = msgId;
- message.persistData = file;
- store.put( new Long( msgId ), message );
+
+ MessageReference mr = JMSServer.getInstance().getMessageCache().add(message);
+ mr.persistData = file;
+ store.put( new Long( msgId ), mr );
}
private void throwJMSException( String message, Exception e )
1.7 +27 -13 jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- PersistenceManager.java 2001/09/04 02:22:42 1.6
+++ PersistenceManager.java 2001/10/28 01:27:00 1.7
@@ -30,12 +30,14 @@
import org.jboss.mq.server.JMSServer;
import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.mq.SpyMessage;
+import org.jboss.mq.server.MessageReference;
/**
* This class manages all persistence related services for file based
* persistence.
*
* @author Paul Kendall ([EMAIL PROTECTED])
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class PersistenceManager extends ServiceMBeanSupport implements
PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager
{
@@ -115,6 +117,9 @@
{
File jbossHome = new File(System.getProperty("jboss.system.home"));
dataDirFile = new File(jbossHome, dataDirectory);
+ dataDirFile.mkdirs();
+ if( !dataDirFile.isDirectory() )
+ throw new Exception("The data directory is not valid:
"+dataDirFile.getCanonicalPath());
JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[]{}, new String[]{});
server.setPersistenceManager(this);
}
@@ -145,6 +150,12 @@
{
for (int i = 0; i < transactFiles.length; i++)
{
+ // Ignore the queue data directories.
+ if( transactFiles[i].isDirectory() ) {
+ transactFiles[i] = null;
+ continue;
+ }
+
try
{
Long tx = new Long(Long.parseLong(transactFiles[i].getName()));
@@ -183,7 +194,7 @@
{
LogInfo logInfo = (LogInfo)iter.next();
JMSDestination q = server.getJMSDestination(logInfo.destination);
- SpyMessage rebuild[] = logInfo.log.restore(txs);
+ MessageReference rebuild[] = logInfo.log.restore(txs);
//TODO: make sure this lock is good enough
synchronized (q)
{
@@ -191,7 +202,9 @@
{
if (logInfo.destination instanceof org.jboss.mq.SpyTopic)
{
- rebuild[i].durableSubscriberID =
((org.jboss.mq.SpyTopic)logInfo.destination).getDurableSubscriptionID();
+ SpyMessage m = rebuild[i].getMessage();
+ m.durableSubscriberID =
((org.jboss.mq.SpyTopic)logInfo.destination).getDurableSubscriptionID();
+ rebuild[i].invalidate(); // since we did an update.
}
q.restoreMessage(rebuild[i]);
}
@@ -283,8 +296,9 @@
* @param txId Description of Parameter
* @exception javax.jms.JMSException Description of Exception
*/
- public void add(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException
+ public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException
{
+ SpyMessage message = messageRef.getMessage();
LogInfo logInfo;
synchronized (messageLogs)
{
@@ -294,10 +308,10 @@
{
throw new javax.jms.JMSException("Destination was not initalized with the
PersistenceManager");
}
- logInfo.log.add(message, txId);
+ logInfo.log.add(messageRef, txId);
if (txId == null)
{
- logInfo.log.finishAdd(message, txId);
+ logInfo.log.finishAdd(messageRef, txId);
}
else
{
@@ -312,7 +326,7 @@
}
synchronized (info.tasks)
{
- info.tasks.addLast(new Transaction(true, logInfo, message, txId));
+ info.tasks.addLast(new Transaction(true, logInfo, messageRef, txId));
}
}
}
@@ -378,9 +392,9 @@
* @param txId Description of Parameter
* @exception javax.jms.JMSException Description of Exception
*/
- public void remove(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId)
throws javax.jms.JMSException
+ public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException
{
-
+ SpyMessage message = messageRef.getMessage();
LogInfo logInfo;
synchronized (messageLogs)
@@ -396,7 +410,7 @@
logInfo.log.remove(message, txId);
if (txId == null)
{
- logInfo.log.finishRemove(message, txId);
+ logInfo.log.finishRemove(messageRef, txId);
}
else
{
@@ -421,7 +435,7 @@
}
synchronized (info.tasks)
{
- info.tasks.addLast(new Transaction(false, logInfo, message, txId));
+ info.tasks.addLast(new Transaction(false, logInfo, messageRef, txId));
}
}
@@ -665,7 +679,7 @@
class Transaction
{
private LogInfo logInfo;
- private SpyMessage message;
+ private MessageReference message;
private org.jboss.mq.pm.Tx txId;
private boolean add;
@@ -677,7 +691,7 @@
* @param message Description of Parameter
* @param txId Description of Parameter
*/
- public Transaction(boolean add, LogInfo logInfo, SpyMessage message,
org.jboss.mq.pm.Tx txId)
+ public Transaction(boolean add, LogInfo logInfo, MessageReference message,
org.jboss.mq.pm.Tx txId)
{
this.add = add;
this.logInfo = logInfo;
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development