User: hiram
Date: 01/01/10 05:57:45
Modified: src/java/org/spydermq/server SharedQueue.java
PersistenceManager.java JMSServer.java
JMSDestination.java ExclusiveQueue.java
ClientConsumer.java BasicQueue.java
AbstractQueue.java
Added: src/java/org/spydermq/server PersistentMessageEnvelope.java
NonPersistentMessageEnvelope.java
MessageEnvelope.java
Log:
Feature Add: Faster recovery after a spyderMQ server failure
Feature Add: Better server scalability by moving message not in the working set to
secondary storage.
Revision Changes Path
1.4 +6 -6 spyderMQ/src/java/org/spydermq/server/SharedQueue.java
Index: SharedQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SharedQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SharedQueue.java 2000/12/26 19:54:32 1.3
+++ SharedQueue.java 2001/01/10 13:57:43 1.4
@@ -23,7 +23,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SharedQueue extends BasicQueue {
@@ -38,7 +38,7 @@
public void run() throws JMSException
{
Log.log(""+this+"->run()");
- SpyMessage[] job;
+ MessageEnvelope[] envelopes;
synchronized (messages) {
if( messages.size() == 0 ) {
@@ -46,8 +46,8 @@
return;
}
- job=new SpyMessage[messages.size()];
- job=(SpyMessage[])messages.toArray(job);
+ envelopes=new MessageEnvelope[messages.size()];
+ envelopes=(MessageEnvelope[])messages.toArray(envelopes);
messages.clear();
}
@@ -61,8 +61,8 @@
ClientConsumer consumer = (ClientConsumer)iter.next();
- for( int i=0 ; i < job.length; i++ )
- consumer.addMessage(job[i]);
+ for( int i=0 ; i < envelopes.length; i++ )
+ consumer.addMessage(envelopes[i].getMessage());
consumer.notifyMessageAvailable();
1.6 +62 -27 spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- PersistenceManager.java 2000/12/27 17:02:22 1.5
+++ PersistenceManager.java 2001/01/10 13:57:43 1.6
@@ -16,17 +16,19 @@
import org.spydermq.xml.XElement;
import org.spydermq.persistence.SpyTxLog;
-import org.spydermq.persistence.SpyMessageLog;
+import org.spydermq.persistence.SpyMessageQueue;
import org.spydermq.SpyDestination;
import org.spydermq.SpyMessage;
import org.spydermq.SpyDistributedConnection;
+import java.io.File;
+
/**
* This class manages all persistence related services.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class PersistenceManager {
@@ -35,10 +37,10 @@
// The configuration data for the manager.
XElement configElement;
// The directory where persistence data should be stored
- URL dataDirectory;
+ File dataDirectory;
// Log file used to store commited transactions.
SpyTxLog spyTxLog;
- // Maps SpyDestinations to SpyMessageLogs
+ // Maps SpyDestinations to SpyMessageQueues
HashMap messageLogs = new HashMap();
// Maps (Long)txIds to LinkedList of Runnable tasks
HashMap postCommitTasks = new HashMap();
@@ -76,11 +78,11 @@
}
static class LogInfo {
- SpyMessageLog log;
+ SpyMessageQueue log;
SpyDestination destination;
String queueId;
- LogInfo(SpyMessageLog log, SpyDestination destination, String queueId)
{
+ LogInfo(SpyMessageQueue log, SpyDestination destination, String
queueId) {
this.log=log;
this.destination=destination;
this.queueId=queueId;
@@ -99,9 +101,10 @@
this.configElement = configElement;
URL configFile =
getClass().getClassLoader().getResource("spyderMQ.xml");
- dataDirectory = new URL(configFile,
configElement.getField("DataDirectory"));
- URL txLogFile = new URL(dataDirectory, "transactions.dat");
- spyTxLog = new SpyTxLog(txLogFile.getFile());
+ dataDirectory = new File(new URL(configFile,
configElement.getField("DataDirectory")).getFile());
+ dataDirectory.mkdirs();
+ File txLogFile = new File(dataDirectory, "transactions.dat");
+ spyTxLog = new SpyTxLog(txLogFile.getAbsolutePath());
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
@@ -112,21 +115,8 @@
}
- public void add(String queueId, org.spydermq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
- LogInfo logInfo;
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
-
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
-
- logInfo.log.add(message, txId);
-
- }
-
public void addPostCommitTask(Long txId, Runnable task) throws
javax.jms.JMSException {
@@ -299,8 +289,9 @@
try {
- URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
- SpyMessageLog log = new SpyMessageLog(logFile.getFile());
+ File logFile = new File(dataDirectory,
dest.toString()+"-"+queueId+".dat");
+ File messageDirectory = new File(dataDirectory,
dest.toString()+"-"+queueId+"-messages");
+ SpyMessageQueue log = new SpyMessageQueue(this,
logFile.getAbsolutePath(), messageDirectory.getAbsolutePath());
LogInfo info = new LogInfo(log, dest, queueId);
@@ -320,10 +311,9 @@
try {
- URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
- java.io.File file = new java.io.File(logFile.getFile());
+ File file = new File(dataDirectory,
dest.toString()+"-"+queueId+".dat");
- SpyMessageLog log =
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
+ SpyMessageQueue log =
(SpyMessageQueue)messageLogs.remove(""+dest+"-"+queueId);
if( log == null )
throw new JMSException("The persistence log was never
initialized");
log.close();
@@ -337,6 +327,51 @@
newE.setLinkedException(e);
throw newE;
}
+
+ }
+
+ public java.io.File add(String queueId, org.spydermq.SpyMessage message, Long
txId) throws javax.jms.JMSException {
+
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+
+ return logInfo.log.add(message, txId);
+
+ }
+
+ public File getPersistenceFileFor(SpyMessage mes, String queueId) throws
javax.jms.JMSException {
+
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo = (LogInfo)
messageLogs.get(""+mes.getJMSDestination()+"-"+queueId);
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+
+ return logInfo.log.messageIdToFile(mes.messageId);
+
+ }
+
+ public File getSpyMessageQueue(SpyMessage mes, String queueId) throws
javax.jms.JMSException {
+
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo = (LogInfo)
messageLogs.get(""+mes.getJMSDestination()+"-"+queueId);
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+
+ return logInfo.log.messageIdToFile(mes.messageId);
}
}
1.12 +3 -1 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- JMSServer.java 2001/01/04 23:24:51 1.11
+++ JMSServer.java 2001/01/10 13:57:43 1.12
@@ -27,7 +27,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
@@ -151,6 +151,8 @@
task.run();
} catch (JMSException e) {
Log.error(e);
+ if( e.getLinkedException() != null )
+ Log.error( e.getLinkedException() );
}
}
1.5 +16 -11 spyderMQ/src/java/org/spydermq/server/JMSDestination.java
Index: JMSDestination.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSDestination.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- JMSDestination.java 2001/01/03 23:25:06 1.4
+++ JMSDestination.java 2001/01/10 13:57:43 1.5
@@ -26,7 +26,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class JMSDestination {
@@ -84,7 +84,7 @@
if( isTopic ) {
- sharedQueue.addMessage(mes, txId);
+ sharedQueue.addMessage(new MessageEnvelope(mes), txId);
synchronized (exclusiveQueues) {
@@ -97,10 +97,12 @@
String queueId = (String)iter.next();
ExclusiveQueue eq =
(ExclusiveQueue)exclusiveQueues.get(queueId);
- if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT )
- server.persistenceManager.add(queueId,
mes, txId);
-
- eq.addMessage(mes, txId);
+ if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
+ java.io.File f =
server.persistenceManager.add(queueId, mes, txId);
+ eq.addMessage(new
PersistentMessageEnvelope(mes,f), txId);
+ } else {
+ eq.addMessage(new
NonPersistentMessageEnvelope(mes,
server.persistenceManager.getPersistenceFileFor(mes,DEFAULT_QUEUE_ID)), txId);
+ }
}
}
@@ -108,10 +110,13 @@
} else {
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get(
DEFAULT_QUEUE_ID );
- if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
- server.persistenceManager.add(DEFAULT_QUEUE_ID, mes,
txId);
-
- eq.addMessage(mes, txId);
+ if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
+ java.io.File f =
server.persistenceManager.add(DEFAULT_QUEUE_ID, mes, txId);
+ eq.addMessage(new PersistentMessageEnvelope(mes, f),
txId);
+ } else {
+ eq.addMessage(new NonPersistentMessageEnvelope(mes,
server.persistenceManager.getPersistenceFileFor(mes,DEFAULT_QUEUE_ID)), txId);
+ }
+
}
@@ -190,7 +195,7 @@
{
Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
ExclusiveQueue eq = getExclusiveQueue(queueId);
- eq.restoreMessage(mes);
+ eq.restoreMessage( new MessageEnvelope(mes) );
}
public String toString() {
1.5 +29 -7 spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java
Index: ExclusiveQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ExclusiveQueue.java 2001/01/03 23:25:06 1.4
+++ ExclusiveQueue.java 2001/01/10 13:57:43 1.5
@@ -24,11 +24,19 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class ExclusiveQueue extends BasicQueue {
+ private static final long MSG_IDLE_TIME_ALLOWENCE = 1000*10; // 10 seconds
+ private long lastActiveConsumerTS=0;
+ // Constructor ---------------------------------------------------
+ public ExclusiveQueue(JMSServer server) throws JMSException
+ {
+ super(server);
+ }
+
// Iterate over the consumers asking them to take messages until they stop
// consuming.
public void run() throws JMSException
@@ -38,7 +46,24 @@
synchronized (messages) {
synchronized (consumers) {
-
+
+ if( consumers.size() ==0 ) {
+ if( (System.currentTimeMillis() -
lastActiveConsumerTS)
+ > MSG_IDLE_TIME_ALLOWENCE ) {
+
+ // No consumers.. move messages to
secondary storage
+ // There have been no consumers for a
while.
+ Iterator i = messages.iterator();
+ while( i.hasNext() ) {
+ MessageEnvelope me =
(MessageEnvelope)i.next();
+ me.moveToSecondaryStorage();
+ }
+
+ }
+
+ return;
+ }
+
LinkedList consumersDone = new LinkedList();
while( consumers.size()!=0 && messages.size() != 0) {
@@ -61,6 +86,8 @@
while( consumersDone.size() != 0 ) {
consumers.addLast(consumersDone.removeFirst());
}
+
+ lastActiveConsumerTS = System.currentTimeMillis();
}
}
@@ -71,9 +98,4 @@
return "ExclusiveQueue";
}
- // Constructor ---------------------------------------------------
- public ExclusiveQueue(JMSServer server) throws JMSException
- {
- super(server);
- }
}
1.7 +9 -8 spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ClientConsumer.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- ClientConsumer.java 2001/01/04 23:24:51 1.6
+++ ClientConsumer.java 2001/01/10 13:57:43 1.7
@@ -27,7 +27,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class ClientConsumer implements Task {
@@ -389,22 +389,23 @@
Iterator i = queue.messages.iterator();
while( i.hasNext() ) {
- SpyMessage message = (SpyMessage)i.next();
+ MessageEnvelope envelope = (MessageEnvelope)i.next();
+ SpyMessage headers = envelope.getHeadersMessage();
- LinkedList l = (LinkedList)destinationSubscriptions.get(
message.getJMSDestination() );
+ LinkedList l = (LinkedList)destinationSubscriptions.get(
headers.getJMSDestination() );
if( l == null ) return false;
Iterator subs = l.iterator();
while( subs.hasNext() ) {
Subscription s = (Subscription)subs.next();
- if( s.accepts( message, true ) ) {
+ if( s.accepts( headers, true ) ) {
s.receiving = false;
i.remove();
ReceiveRequest r = new ReceiveRequest();
- r.message = message;
+ r.message = envelope.getMessage();
r.subscriptionId = new
Integer(s.subscriptionId);
synchronized (messages) {
@@ -412,13 +413,13 @@
}
AcknowledgementRequest ack = new
AcknowledgementRequest();
- ack.destination = message.getJMSDestination();
- ack.messageID = message.getJMSMessageID();
+ ack.destination = headers.getJMSDestination();
+ ack.messageID = headers.getJMSMessageID();
ack.subscriberId = s.subscriptionId;
ack.isAck = false;
synchronized (unacknowledgedMessages) {
- unacknowledgedMessages.put(ack,
message);
+ unacknowledgedMessages.put(ack,
r.message);
}
notifyMessageAvailable();
1.2 +63 -61 spyderMQ/src/java/org/spydermq/server/BasicQueue.java
Index: BasicQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/BasicQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- BasicQueue.java 2000/12/24 01:55:06 1.1
+++ BasicQueue.java 2001/01/10 13:57:44 1.2
@@ -28,7 +28,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
abstract public class BasicQueue implements Task, AbstractQueue {
@@ -47,49 +47,7 @@
this.server=server;
}
-
- //Used to put a message that was added previously to the queue, back in the
queue
- public void restoreMessage(SpyMessage mes)
- {
- //restore a message to the message list...
- synchronized (messages) {
- messages.add(mes);
- }
- notifyMessageAvailable();
- }
-
- public void addMessage(SpyMessage mes, Long txId) throws JMSException
- {
- Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
- // This task gets run to make the message visible in the queue.
- class AddMessagePostCommitTask implements Runnable {
- SpyMessage message;
-
- AddMessagePostCommitTask(SpyMessage m) {
- message = m;
- }
-
- public void run() {
- //restore a message to the message list...
- synchronized (messages) {
- messages.add(message);
- }
- notifyMessageAvailable();
- }
- }
-
- // The message gets added to the queue after the transaction
- // commits (if the message was transacted)
- Runnable task = new AddMessagePostCommitTask(mes);
- if( txId == null ) {
- task.run();
- } else {
- server.persistenceManager.addPostCommitTask(txId, task);
- }
-
- }
-
//
public void addConsumer(ClientConsumer consumer) throws JMSException
{
@@ -104,29 +62,36 @@
public SpyMessage[] browse(String selector) throws JMSException {
if( selector == null ) {
- SpyMessage list[];
+ MessageEnvelope list[];
synchronized (messages) {
- list = new SpyMessage[messages.size()];
- list = (SpyMessage [])messages.toArray(list);
+ list = new MessageEnvelope[messages.size()];
+ list = (MessageEnvelope [])messages.toArray(list);
}
- return list;
+
+ SpyMessage messageList[] = new SpyMessage[list.length];
+ for( int i=0; i < list.length; i++ )
+ messageList[i] = list[i].getMessage();
+ return messageList;
+
} else {
Selector s = new Selector( selector );
LinkedList selection=new LinkedList();
-
+
+ MessageEnvelope list[];
synchronized (messages) {
- Iterator i = messages.iterator();
- while( i.hasNext() ) {
- SpyMessage m = (SpyMessage)i.next();
- if( s.test(m) )
- selection.add(m);
- }
+ list = new MessageEnvelope[messages.size()];
+ list = (MessageEnvelope [])messages.toArray(list);
}
-
- SpyMessage list[];
- list = new SpyMessage[selection.size()];
- list = (SpyMessage [])selection.toArray(list);
- return list;
+
+ for( int i=0; i < list.length; i++ ) {
+ SpyMessage m = list[i].getMessage();
+ if( s.test(m) )
+ selection.add(m);
+ }
+
+ SpyMessage messageList[] = new SpyMessage[selection.size()];
+ messageList = (SpyMessage [])selection.toArray(messageList);
+ return messageList;
}
}
@@ -148,10 +113,10 @@
if (messages.size()==0)
return null;
- SpyMessage m = (SpyMessage)messages.first();
+ MessageEnvelope m = (MessageEnvelope)messages.first();
messages.remove(m);
- return m;
+ return m.getMessage();
}
}
@@ -164,4 +129,41 @@
}
}
+ public void addMessage(MessageEnvelope mes, Long txId) throws JMSException
+ {
+ Log.log(""+this+"->addMessage(mes="+mes.messageId+",txId="+txId+")");
+
+ // This task gets run to make the message visible in the queue.
+ class AddMessagePostCommitTask implements Runnable {
+ MessageEnvelope message;
+
+ AddMessagePostCommitTask(MessageEnvelope m) {
+ message = m;
+ }
+
+ public void run() {
+ //restore a message to the message list...
+ synchronized (messages) {
+ messages.add(message);
+ }
+ notifyMessageAvailable();
+ }
+ }
+
+ // The message gets added to the queue after the transaction
+ // commits
+ Runnable task = new AddMessagePostCommitTask( mes );
+ server.persistenceManager.addPostCommitTask(txId, task);
+
+ }
+
+ //Used to put a message that was added previously to the queue, back in the
queue
+ public void restoreMessage(MessageEnvelope mes)
+ {
+ //restore a message to the message list...
+ synchronized (messages) {
+ messages.add(mes);
+ }
+ notifyMessageAvailable();
+ }
}
1.2 +5 -3 spyderMQ/src/java/org/spydermq/server/AbstractQueue.java
Index: AbstractQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/AbstractQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- AbstractQueue.java 2000/12/23 15:48:25 1.1
+++ AbstractQueue.java 2001/01/10 13:57:44 1.2
@@ -15,11 +15,13 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface AbstractQueue {
+
+ public void removeConsumer(ClientConsumer consumer) throws JMSException;
public void addConsumer(ClientConsumer consumer) throws JMSException;
- public void addMessage(SpyMessage mes, Long txId) throws JMSException;
+
+ public void addMessage(MessageEnvelope mes, Long txId) throws JMSException;
void notifyMessageAvailable();
- public void removeConsumer(ClientConsumer consumer) throws JMSException;
}
1.1
spyderMQ/src/java/org/spydermq/server/PersistentMessageEnvelope.java
Index: PersistentMessageEnvelope.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
import java.io.File;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
import java.io.ObjectInputStream;
import org.spydermq.SpyMessage;
/**
* This class represents an evelope used to place NonPersistent SpyMessages in.
*
* This class will use an existing file to read the message from when loading
* from secondary storeage.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class PersistentMessageEnvelope extends MessageEnvelope {
File persistenceFile;
byte envelopeState;
private static final byte MESSAGE_LOADED_STATE = 0;
private static final byte HEADERS_LOADED_STATE = 1;
private static final byte NOT_LOADED_STATE = 2;
/**
* MessageEnvelope constructor comment.
*/
public PersistentMessageEnvelope(SpyMessage m, File messageImage ) {
super( m );
persistenceFile = messageImage;
envelopeState = MESSAGE_LOADED_STATE;
}
public SpyMessage getHeadersMessage() throws JMSException {
if( message == null )
loadFromStorage();
return message;
}
public SpyMessage getMessage() throws JMSException {
if( message == null )
loadFromStorage();
return message;
}
private void loadFromStorage() throws JMSException {
try {
ObjectInputStream is = new ObjectInputStream(
new BufferedInputStream(
new FileInputStream( persistenceFile ) ) );
message = (SpyMessage)is.readObject();
message.messageId = messageId;
is.close();
} catch (Exception e ) {
throwJMSException("Could not read a persisted message", e);
}
}
public void moveToSecondaryStorage() throws JMSException {
message = null;
}
private void throwJMSException(String message, Exception e) throws
JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
}
}
1.1
spyderMQ/src/java/org/spydermq/server/NonPersistentMessageEnvelope.java
Index: NonPersistentMessageEnvelope.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
import java.io.File;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import org.spydermq.SpyMessage;
/**
* This class represents an evelope used to place NonPersistent SpyMessages in.
*
* This class will create a temporary file to be used as secondary storage.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class NonPersistentMessageEnvelope extends MessageEnvelope {
File persistenceFile;
byte envelopeState;
boolean isPersisted = false;
private static final byte MESSAGE_LOADED_STATE = 0;
private static final byte HEADERS_LOADED_STATE = 1;
private static final byte NOT_LOADED_STATE = 2;
public NonPersistentMessageEnvelope(SpyMessage m, File persistenceFile) {
super(m);
envelopeState = MESSAGE_LOADED_STATE;
this.persistenceFile = persistenceFile;
}
public void finalize() {
if (isPersisted) {
persistenceFile.delete();
}
}
public SpyMessage getHeadersMessage() throws JMSException {
if (message == null)
loadFromStorage();
return message;
}
public SpyMessage getMessage() throws JMSException {
if (message == null)
loadFromStorage();
return message;
}
private void loadFromStorage() throws JMSException {
try {
ObjectInputStream is = new ObjectInputStream(new
BufferedInputStream(new FileInputStream(persistenceFile)));
message = (SpyMessage) is.readObject();
message.messageId = messageId;
is.close();
} catch (Exception e) {
throwJMSException("Could not read a persisted message", e);
}
}
public void moveToSecondaryStorage() throws JMSException {
if (!isPersisted) {
try {
ObjectOutputStream os = new ObjectOutputStream(new
BufferedOutputStream(new FileOutputStream(persistenceFile)));
os.writeObject(message);
os.close();
isPersisted = true;
} catch (IOException e) {
throwJMSException("Could not persist the message", e);
}
}
message = null;
}
private void throwJMSException(String message, Exception e) throws
JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
}
}
1.1 spyderMQ/src/java/org/spydermq/server/MessageEnvelope.java
Index: MessageEnvelope.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.server;
import javax.jms.JMSException;
import java.io.File;
import org.spydermq.SpyMessage;
/**
* This class represents an evelope used to place SpyMessages in.
*
* It holds the minimum amount of information need by the server
* to keep the message ordered in it's transmision queue.
*
* Subclasses such as PersistentMessageEnvelope and NonPersistentMessageEnvelope
* extend this class so that message are moved to secondary storage
* when the message is not part of the working set.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class MessageEnvelope implements Comparable {
//For ordering in the JMSServerQueue (set on the server side)
long messageId;
int jmsPriority;
SpyMessage message;
/**
* MessageEnvelope constructor comment.
*/
public MessageEnvelope(SpyMessage m) {
message = m;
messageId = m.messageId;
jmsPriority = m.getJMSPriority();
}
/**
* used to order this message with respect to other message in
* the servers queue.
*/
public int compareTo(java.lang.Object o) {
MessageEnvelope me = (MessageEnvelope) o;
if (jmsPriority > me.jmsPriority) {
return -1;
}
if (jmsPriority < me.jmsPriority) {
return 1;
}
return (int) (messageId - me.messageId);
}
/**
* return a message with at least the original headers
* (That can be used to evaluagte a Selector)
*/
public SpyMessage getHeadersMessage() throws JMSException {
return message;
}
/**
* returns the full original message.
*/
public SpyMessage getMessage() throws JMSException {
return message;
}
/**
* This is a signal that the message is not in the working
* set and should be moved to secondary storage.
*/
public void moveToSecondaryStorage() throws JMSException {
}
}