User: hiram
Date: 00/12/15 19:27:51
Modified: src/java/org/spydermq/server JMSServer.java
JMSServerQueue.java JMSServerQueueReceiver.java
StartServer.java
Added: src/java/org/spydermq/server PersistenceManager.java
Log:
Better persistence and Transactions!
Transactions work like the should, all operations get done, or none at all
(It should even be rolling back persistent messages that were logged that were part
of a transaction that did not complete due to abnormal server termination).
You can also configure in what directory you would like to
store the persistence data at.
Revision Changes Path
1.2 +66 -37 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JMSServer.java 2000/12/12 05:58:46 1.1
+++ JMSServer.java 2000/12/16 03:27:50 1.2
@@ -26,20 +26,17 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
{
-
- // Constants -----------------------------------------------------
-
- //number of threads in the pool (TO DO: this value should be dynamic)
- final int NB_THREADS=1;
- public static final String OBJECT_NAME = "JMS:service=JMSServer";
- // Attributes ----------------------------------------------------
-
+ /////////////////////////////////////////////////////////////////////
+ // Attributes
+ /////////////////////////////////////////////////////////////////////
+ public static final String OBJECT_NAME = "JMS:service=JMSServer";
+ final int NB_THREADS=1;
//messages pending for a Destination ( HashMap of JMSServerQueue objects )
public HashMap messageQueue;
//list of tasks pending ( linked list of JMSServerQueue objects )
@@ -52,7 +49,9 @@
private int lastTemporaryQueue;
//The security manager
SecurityManager securityManager;
-
+ //The persistence manager
+ PersistenceManager persistenceManager;
+
/**
* <code>true</code> when the server is running. <code>false</code> when the
* server should stop running.
@@ -66,8 +65,9 @@
*/
private boolean stopped = true;
- // Constructor ---------------------------------------------------
-
+ /////////////////////////////////////////////////////////////////////
+ // Constructors
+ /////////////////////////////////////////////////////////////////////
public JMSServer(SecurityManager securityManager)
{
@@ -88,6 +88,11 @@
}
+
+ /////////////////////////////////////////////////////////////////////
+ // Public Methods
+ /////////////////////////////////////////////////////////////////////
+
/**
* Returns <code>false</code> if the JMS server is currently
* running and handling requests, <code>true</code> otherwise.
@@ -100,11 +105,8 @@
return this.stopped;
}
- // Public --------------------------------------------------------
-
//This is a correct threading system, but this is not ideal...
- //We should let threads cycle through the JMSServerQueue list, and
synchronized on the queue they are working on.
-
+ //We should let threads cycle through the JMSServerQueue list, and
synchronized on the queue they are working on.
public void run() {
while (alive) {
JMSServerQueue queue = null;
@@ -326,24 +328,15 @@
//Sent by a client to Ack or Nack a message.
public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws JMSException
- {
- JMSServerQueue
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
- if (serverQueue==null) throw new JMSException("Destination does not
exist: "+item.jmsDestination);
-
- serverQueue.acknowledge(dc, item.jmsMessageID, item.isAck);
+ {
+ acknowledge(dc, item, null);
}
//A connection has sent a new message
public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws
JMSException
{
-
- Log.notice("INCOMING: "+dc.getClientID()+" => "+val.jmsDestination);
- JMSServerQueue
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
- if (queue==null) throw new JMSException("This destination does not
exist !");
- //Add the message to the queue
- queue.addMessage(val);
-
+ addMessage( dc, val, null);
}
public void connectionListening(SpyDistributedConnection dc,boolean
mode,Destination dest) throws JMSException
@@ -380,24 +373,33 @@
}
/**
- * TODO: The following function has to be performed as a Unit Of Work.
+ * The following function performs a Unit Of Work.
*
- * for now we just do a quick hack to make it work most of the time.
*/
public void transact(SpyDistributedConnection dc, Transaction t) throws
JMSException {
- if( t.messages != null ) {
- for( int i=0; i < t.messages.length; i++ ) {
- addMessage(dc, t.messages[i]);
+ Long txId = persistenceManager.createTx();
+
+ try {
+
+ if( t.messages != null ) {
+ for( int i=0; i < t.messages.length; i++ ) {
+ addMessage(dc, t.messages[i], txId);
+ }
}
- }
- if( t.acks != null ) {
- for( int i=0; i < t.acks.length; i++ ) {
- acknowledge(dc, t.acks[i]);
+ if( t.acks != null ) {
+ for( int i=0; i < t.acks.length; i++ ) {
+ acknowledge(dc, t.acks[i], txId);
+ }
}
+
+ persistenceManager.commitTx(txId);
+
+ } catch ( JMSException e ) {
+ persistenceManager.rollbackTx(txId);
+ throw e;
}
-
}
@@ -410,4 +412,31 @@
queue.removeSubscriber(dc,null);
}
+ //Sent by a client to Ack or Nack a message.
+ public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item, Long txId) throws JMSException
+ {
+ JMSServerQueue
serverQueue=(JMSServerQueue)messageQueue.get(item.jmsDestination);
+ if (serverQueue==null) throw new JMSException("Destination does not
exist: "+item.jmsDestination);
+ serverQueue.acknowledge(dc, item, txId);
+ }
+
+ //A connection has sent a new message
+ public void addMessage(SpyDistributedConnection dc, SpyMessage val, Long txId)
throws JMSException
+ {
+
+ Log.notice("INCOMING: (TX="+txId+")"+dc.getClientID()+" =>
"+val.jmsDestination);
+ JMSServerQueue
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
+ if (queue==null) throw new JMSException("This destination does not
exist !");
+ //Add the message to the queue
+ queue.addMessage(val, txId);
+
+ }
+
+ public JMSServerQueue getServerQueue(SpyDestination d) throws JMSException
+ {
+ JMSServerQueue queue=(JMSServerQueue)messageQueue.get(d);
+ if (queue==null) throw new JMSException("This destination does not
exist !");
+ return queue;
+ }
+
}
1.2 +63 -64 spyderMQ/src/java/org/spydermq/server/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServerQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JMSServerQueue.java 2000/12/12 05:58:45 1.1
+++ JMSServerQueue.java 2000/12/16 03:27:50 1.2
@@ -25,7 +25,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSServerQueue {
// Attributes ----------------------------------------------------
@@ -44,7 +44,7 @@
//If this is linked to a temporaryDestination,
temporaryDestination=DistributedConnection of the owner, otherwise it's null
SpyDistributedConnection temporaryDestination;
//The JMSServer object
- private JMSServer server;
+ JMSServer server;
//Am I a queue or a topic
boolean isTopic;
@@ -58,8 +58,7 @@
// Should we use the round robin aproach to pick the next reciver of a p2p
message?
private boolean useRoundRobinMessageDistribution = true;
- // Used to log the persistent messages.
- SpyMessageLog spyMessageLog;
+
// Constructor ---------------------------------------------------
@@ -74,15 +73,10 @@
this.server=server;
isTopic=dest instanceof SpyTopic;
listeners=0;
-
- spyMessageLog = new SpyMessageLog(
dest.getName()+"-transaction-log.dat" );
- SpyMessage[] rebuild = spyMessageLog.rebuildMessagesFromLog();
- for( int i=0; i < rebuild.length; i++ ) {
- restoreMessage( rebuild[i] );
- messageIdCounter = Math.max( messageIdCounter,
rebuild[i].messageId+1 );
- }
- Log.notice("Restored "+rebuild.length+" messages to "+dest.getName()+"
from the transaction log");
+ if( !isTopic )
+ server.persistenceManager.initQueue(dest);
+
}
@@ -122,37 +116,8 @@
}
}
-
-
- public void addMessage(SpyMessage mes) throws JMSException
- {
- //Add a message to the message list...
- synchronized (messages)
- {
- //Add the message to the queue
- //messages is now an ordered tree. The order depends
- //first on the priority and then on messageId
- mes.messageId = messageIdCounter++;
-
- if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
- spyMessageLog.logAddMessage(mes);
- spyMessageLog.commit();
- }
-
- messages.add(mes);
-
- if (isTopic) {
- //if a thread is already working on this destination,
I don't have to myself to the taskqueue
- if (!threadWorking) notifyWorkers();
- } else {
- if (listeners!=0&&!threadWorking) notifyWorkers();
- }
-
- }
- }
-
-
+
//Clear the message queue
synchronized SpyMessage[] startWork()
{
@@ -295,16 +260,6 @@
//Clear the message queue
SpyMessage[] msgs=startWork();
-
- boolean msgRemoved=false;
- for( int i=0; i < msgs.length; i++ ) {
- if( msgs[i].getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
- spyMessageLog.logRemoveMessage(msgs[i]);
- msgRemoved=true;
- }
- }
- if( msgRemoved )
- spyMessageLog.commit();
//Let the thread do its work
if (msgs.length == 1) {
@@ -344,8 +299,7 @@
if (mes.isOutdated()) {
if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
-
spyMessageLog.logRemoveMessage(mes);
- spyMessageLog.commit();
+
server.persistenceManager.remove(destination,mes, null);
}
continue;
}
@@ -419,18 +373,7 @@
}
}
-
-
- public void acknowledge(SpyDistributedConnection dc, String messageId, boolean
isAck) throws JMSException {
-
- JMSServerQueueReceiver qr =
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
- if( qr==null )
- throw new JMSException("You have not subscribed to this
destination.");
-
- qr.acknowledge(messageId, isAck);
-
- }
@@ -559,5 +502,61 @@
}
}
+
+ public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item, Long txId) throws JMSException {
+
+ JMSServerQueueReceiver qr =
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
+ if( qr==null )
+ throw new JMSException("You have not subscribed to this
destination.");
+
+ qr.acknowledge(item.jmsMessageID, item.isAck, txId);
+
+ }
+
+
+ public void addMessage(SpyMessage mes, Long txId) throws JMSException
+ {
+
+ // This task gets run to make the message visible in the queue.
+ class AddMessagePostCommitTask implements Runnable {
+ SpyMessage message;
+
+ AddMessagePostCommitTask(SpyMessage m) {
+ message = m;
+ }
+
+ public void run() {
+ synchronized (messages)
+ {
+ //Add the message to the queue
+ messages.add(message);
+ if (isTopic) {
+ //if a thread is already working on
this destination, I don't have to myself to the taskqueue
+ if (!threadWorking) notifyWorkers();
+ } else {
+ if (listeners!=0&&!threadWorking)
notifyWorkers();
+ }
+ }
+ }
+ }
+
+ //messages is now an ordered tree. The order depends
+ //first on the priority and then on messageId
+ mes.messageId = messageIdCounter++;
+
+ //We are only persisting queues for now.
+ if( !isTopic && mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
+ server.persistenceManager.add(destination, mes, txId);
+
+ // The message gets added to the queue after the transaction
+ // commits (if the message was transacted)
+ Runnable task = new AddMessagePostCommitTask(mes);
+ if( txId == null ) {
+ task.run();
+ } else {
+ server.persistenceManager.addPostCommitTask(txId, task);
+ }
+
+ }
}
1.2 +48 -30
spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java
Index: JMSServerQueueReceiver.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServerQueueReceiver.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JMSServerQueueReceiver.java 2000/12/12 05:58:43 1.1
+++ JMSServerQueueReceiver.java 2000/12/16 03:27:50 1.2
@@ -22,7 +22,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSServerQueueReceiver implements Serializable {
@@ -51,37 +51,7 @@
}
- void acknowledge(String messageId, boolean ack) throws javax.jms.JMSException {
- SpyMessage m;
- synchronized (unacknowledgedMessages) {
- m = (SpyMessage) unacknowledgedMessages.remove(messageId);
- }
- if (m == null)
- return;
-
- if (jmsSeverQueue.isTopic) {
- // Not sure how we should handle the topic case.
- // On a negative acknowledge, we don't want to
- // add it back to the topic since other
- // receivers might get a duplicate duplicate message.
- } else {
- // Was it a negative acknowledge??
- if (!ack) {
- Log.log("Restoring message: " + m.messageId);
- jmsSeverQueue.restoreMessage(m);
- } else {
-
- if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
) {
-
jmsSeverQueue.spyMessageLog.logRemoveMessage(m);
- jmsSeverQueue.spyMessageLog.commit();
- }
-
- Log.log("Message Ack: " + m.messageId);
- }
- }
-
- }
// The connection is accepting new messages if there
@@ -174,4 +144,52 @@
}
}
+ void acknowledge(String messageId, boolean ack, Long txId) throws
javax.jms.JMSException {
+
+ // This task gets run to place the neg ack a messge (place it back on
the queue)
+ class RestoreMessageTask implements Runnable {
+ SpyMessage message;
+
+ RestoreMessageTask(SpyMessage m) {
+ message = m;
+ }
+
+ public void run() {
+ Log.log("Restoring message: " + message.messageId);
+ jmsSeverQueue.restoreMessage(message);
+ }
+ }
+
+ SpyMessage m;
+ synchronized (unacknowledgedMessages) {
+ m = (SpyMessage) unacknowledgedMessages.remove(messageId);
+ }
+
+ if (m == null)
+ return;
+
+ if (jmsSeverQueue.isTopic) {
+ // Not sure how we should handle the topic case.
+ // On a negative acknowledge, we don't want to
+ // add it back to the topic since other
+ // receivers might get a duplicate message.
+ } else {
+ // Was it a negative acknowledge??
+ if (!ack) {
+ Runnable task = new RestoreMessageTask(m);
+ if( txId == null )
+ task.run();
+ else
+
jmsSeverQueue.server.persistenceManager.addPostCommitTask(txId, task);
+
+ } else {
+
+ if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
) {
+
jmsSeverQueue.server.persistenceManager.remove(jmsSeverQueue.destination, m, txId);
+ }
+ Log.log("Message Ack: " + m.messageId);
+ }
+ }
+
+ }
}
1.4 +17 -3 spyderMQ/src/java/org/spydermq/server/StartServer.java
Index: StartServer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/StartServer.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- StartServer.java 2000/12/13 00:34:06 1.3
+++ StartServer.java 2000/12/16 03:27:50 1.4
@@ -36,9 +36,11 @@
import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
import org.spydermq.SpyTopicConnectionFactory;
import org.spydermq.xml.XElement;
+import org.spydermq.persistence.SpyTxLog;
/**
* Class used to start a JMS service. This can be called from inside another
+
* application to start the JMS provider.
*
* @author Norbert Lataille ([EMAIL PROTECTED])
@@ -46,7 +48,7 @@
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class StartServer implements Runnable
{
@@ -164,6 +166,10 @@
//Create the JMSServer object
theServer = new JMSServer(securityManager);
+
+ PersistenceManager persistenceManager = new
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
+ theServer.persistenceManager = persistenceManager;
+
registerService(theServer, new
ObjectName(JMSServer.OBJECT_NAME));
//create the known topics
@@ -175,7 +181,7 @@
String name = element.getField("Name");
Topic t=theServer.newTopic(name);
- subcontext.rebind(name,t);
+ subcontext.rebind(name,t);
}
@@ -205,6 +211,9 @@
}
}
+ // Restore the persistent messages to thie queues.
+ theServer.persistenceManager.restore();
+
iter = serverCfg.getElementsNamed("InvocationLayer");
while( iter.hasNext() ) {
@@ -236,8 +245,13 @@
} catch (Exception e) {
System.err.println("Cannot start the JMS server !
"+e.getMessage());
- System.err.println(e);
+ e.printStackTrace(System.err);
+ if( e instanceof JMSException ) {
+ System.err.println("Linked Exception:");
+
((JMSException)e).getLinkedException().printStackTrace(System.err);
+ }
}
}
+ SpyTxLog spyTxManager;
}
1.1 spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
import java.net.URL;
import java.util.HashMap;
import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.spydermq.xml.XElement;
import org.spydermq.persistence.SpyTxLog;
import org.spydermq.persistence.SpyMessageLog;
import org.spydermq.SpyDestination;
import org.spydermq.SpyMessage;
/**
* This class manages all persistence related services.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class PersistenceManager {
// The server this persistence manager is providing service for
JMSServer server;
// The configuration data for the manager.
XElement configElement;
// The directory where persistence data should be stored
URL dataDirectory;
// Log file used to store commited transactions.
SpyTxLog spyTxLog;
// Maps SpyDestinations to SpyMessageLogs
HashMap messageLogs = new HashMap();
// Maps (Long)txIds to LinkedList of Runnable tasks
HashMap postCommitTasks = new HashMap();
/**
* PersistenceManager constructor.
*/
public PersistenceManager(JMSServer server, XElement configElement) throws
javax.jms.JMSException {
try {
this.server = server;
this.configElement = configElement;
URL configFile =
getClass().getClassLoader().getResource("spyderMQ.xml");
dataDirectory = new URL(configFile,
configElement.getField("DataDirectory"));
URL txLogFile = new URL(dataDirectory, "transactions.dat");
spyTxLog = new SpyTxLog(txLogFile.getFile());
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void add(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage
message, Long txId) throws javax.jms.JMSException {
SpyMessageLog log;
synchronized (messageLogs) {
log = (SpyMessageLog) messageLogs.get(dest);
}
if (log == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
synchronized (log) {
log.add(message, txId);
}
}
public void addPostCommitTask(Long txId, Runnable task) throws
javax.jms.JMSException {
LinkedList tasks;
synchronized (postCommitTasks) {
tasks = (LinkedList) postCommitTasks.get(txId);
}
if (tasks == null)
throw new javax.jms.JMSException("Transaction is not active.");
synchronized (tasks) {
tasks.addLast(task);
}
}
public void commitTx(Long txId) throws javax.jms.JMSException {
LinkedList tasks;
synchronized (postCommitTasks) {
tasks = (LinkedList) postCommitTasks.remove(txId);
}
if (tasks == null)
throw new javax.jms.JMSException("Transaction is not active.");
spyTxLog.commitTx(txId);
synchronized (tasks) {
Iterator iter = tasks.iterator();
while (iter.hasNext()) {
Runnable task = (Runnable) iter.next();
task.run();
}
}
}
public Long createTx() throws javax.jms.JMSException {
Long txId = spyTxLog.createTx();
synchronized (postCommitTasks) {
postCommitTasks.put(txId, new LinkedList());
}
return txId;
}
public void initQueue(org.spydermq.SpyDestination dest) throws
javax.jms.JMSException {
try {
URL logFile = new URL(dataDirectory, dest.getName() + "-queue"
+ ".dat");
SpyMessageLog log = new SpyMessageLog(logFile.getFile());
messageLogs.put(dest, log);
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void remove(org.spydermq.SpyDestination dest, org.spydermq.SpyMessage
message, Long txId) throws javax.jms.JMSException {
SpyMessageLog log;
synchronized (messageLogs) {
log = (SpyMessageLog) messageLogs.get(dest);
}
if (log == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
log.remove(message, null);
}
public void restore() throws javax.jms.JMSException {
TreeSet commitedTXs = spyTxLog.restore();
HashMap clone;
synchronized (messageLogs) {
clone = (HashMap) messageLogs.clone();
}
Iterator iter = clone.keySet().iterator();
while (iter.hasNext()) {
SpyDestination dest = (SpyDestination) iter.next();
JMSServerQueue q = server.getServerQueue(dest);
SpyMessageLog log = (SpyMessageLog) clone.get(dest);
SpyMessage rebuild[] = log.restore(commitedTXs);
//TODO: make sure this sync lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
q.restoreMessage(rebuild[i]);
q.messageIdCounter =
Math.max(q.messageIdCounter, rebuild[i].messageId + 1);
}
}
}
}
public void rollbackTx(Long txId) throws javax.jms.JMSException {
LinkedList tasks;
synchronized (postCommitTasks) {
tasks = (LinkedList) postCommitTasks.remove(txId);
}
if (tasks == null)
throw new javax.jms.JMSException("Transaction is not active.");
spyTxLog.rollbackTx(txId);
}
}