User: pkendall
Date: 01/05/15 00:16:48
Modified: src/main/org/jbossmq/server StartServer.java
PersistenceManager.java JMSDestination.java
Log:
Modifications for abstracting out persistence package.
Revision Changes Path
1.3 +24 -14 jbossmq/src/main/org/jbossmq/server/StartServer.java
Index: StartServer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/StartServer.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- StartServer.java 2001/03/02 01:13:02 1.2
+++ StartServer.java 2001/05/15 07:16:48 1.3
@@ -40,7 +40,7 @@
/**
* Class used to start a JMS service. This can be called from inside another
-
+
* application to start the JMS provider.
*
* @author Norbert Lataille ([EMAIL PROTECTED])
@@ -48,7 +48,7 @@
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class StartServer implements Runnable
{
@@ -143,10 +143,10 @@
public void run() {
try {
-
+
//Load the property file
InputStream in =
getClass().getClassLoader().getResource("jbossmq.xml").openStream();
- XElement serverCfg = XElement.createFrom(in);
+ XElement serverCfg = XElement.createFrom(in);
in.close();
// Make sure that we loaded the right type of xml file
@@ -166,15 +166,25 @@
UserManager userManager=new UserManager(theServer,
serverCfg.getElement("UserManager"));
theServer.userManager = userManager;
- //Creatye a PersistenceManager object
- PersistenceManager persistenceManager = new
PersistenceManager(theServer, serverCfg.getElement("PersistenceManager"));
+ //Create a PersistenceManager object
+ PersistenceManager persistenceManager;
+ XElement pmcfg = serverCfg.getElement("PersistenceManager");
+ if( pmcfg.getAttribute("class") == null ) {
+ persistenceManager = new
org.jbossmq.persistence.PersistenceManager(theServer, pmcfg);
+ }
+ else {
+ Class pmc = getClass().forName(pmcfg.getAttribute("class"));
+ Class[] types = {theServer.getClass(),
pmcfg.getClass()};
+ Object[] args = {theServer, pmcfg};
+ persistenceManager =
(PersistenceManager)pmc.getConstructor(types).newInstance(args);
+ }
theServer.persistenceManager = persistenceManager;
-
+
registerService(theServer, new
ObjectName(JMSServer.OBJECT_NAME));
//create the known topics
Context subcontext=ctx.createSubcontext("topic");
-
+
Enumeration enum = serverCfg.getElementsNamed("Topic");
while( enum.hasMoreElements() ) {
XElement element = (XElement)enum.nextElement();
@@ -183,11 +193,11 @@
Topic t=theServer.newTopic(name);
subcontext.rebind(name,t);
}
-
+
//create the known queues
subcontext=ctx.createSubcontext("queue");
-
+
enum = serverCfg.getElementsNamed("Queue");
while( enum.hasMoreElements() ) {
XElement element = (XElement)enum.nextElement();
@@ -205,7 +215,7 @@
enum = serverCfg.getElementsNamed("InvocationLayer");
while( enum.hasMoreElements()) {
-
+
XElement element = (XElement)enum.nextElement();
String name = element.getField("Name");
String topicConnectionFactoryJNDI =
element.getField("TopicConnectionFactoryJNDI");
@@ -228,12 +238,12 @@
//(re)bind the connection factories in the JNDI
namespace
ctx.rebind(topicConnectionFactoryJNDI,invocationLayerFactory.spyTopicConnectionFactory);
-
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
+
ctx.rebind(queueConnectionFactoryJNDI,invocationLayerFactory.spyQueueConnectionFactory);
ctx.rebind(xaTopicConnectionFactoryJNDI,invocationLayerFactory.spyXATopicConnectionFactory);
ctx.rebind(xaQueueConnectionFactoryJNDI,invocationLayerFactory.spyXAQueueConnectionFactory);
-
+
}
-
+
System.out.println("Server Version 0.8 Started");
1.4 +136 -234 jbossmq/src/main/org/jbossmq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/PersistenceManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- PersistenceManager.java 2001/05/13 08:22:00 1.3
+++ PersistenceManager.java 2001/05/15 07:16:48 1.4
@@ -8,44 +8,31 @@
import javax.jms.JMSException;
-import java.net.URL;
import java.util.HashMap;
-import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.jbossmq.xml.XElement;
import org.jbossmq.SpyMessage;
-import org.jbossmq.persistence.SpyTxLog;
-import org.jbossmq.persistence.SpyMessageLog;
import org.jbossmq.SpyDestination;
import org.jbossmq.SpyDistributedConnection;
/**
- * This class manages all persistence related services.
+ * This class allows provides the base for user supplied persistence packages.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.3 $
+ * @author Paul Kendall ([EMAIL PROTECTED])
+ *
+ * @version $Revision: 1.4 $
*/
-public class PersistenceManager {
+public abstract class PersistenceManager {
- // The server this persistence manager is providing service for
- JMSServer server;
- // The configuration data for the manager.
- XElement configElement;
- // The directory where persistence data should be stored
- URL dataDirectory;
- // Log file used to store commited transactions.
- SpyTxLog spyTxLog;
- // Maps SpyDestinations to SpyMessageLogs
- HashMap messageLogs = new HashMap();
// Maps (Long)txIds to LinkedList of Runnable tasks
HashMap postCommitTasks = new HashMap();
- // Maps Global transactions to local transactions
- HashMap globalToLocal = new HashMap();
// Maps (Long)txIds to LinkedList of Runnable tasks
HashMap postRollbackTasks = new HashMap();
+ // Maps Global transactions to local transactions
+ HashMap globalToLocal = new HashMap();
class GlobalXID implements Runnable {
SpyDistributedConnection dc;
@@ -55,292 +42,207 @@
this.dc = dc;
this.xid = xid;
}
-
+
public boolean equals(Object obj)
{
- if (obj==null) return false;
+ if (obj==null) return false;
if (obj.getClass()!=GlobalXID.class) return false;
return ((GlobalXID)obj).xid.equals( xid ) &&
((GlobalXID)obj).dc.equals(dc);
}
-
+
public int hashCode() {
return xid.hashCode();
}
public void run() {
synchronized (globalToLocal) {
- globalToLocal.remove(this);
+ globalToLocal.remove(this);
}
}
}
- static class LogInfo {
- SpyMessageLog log;
- SpyDestination destination;
- String queueId;
-
- LogInfo(SpyMessageLog log, SpyDestination destination, String queueId)
{
- this.log=log;
- this.destination=destination;
- this.queueId=queueId;
+ /**
+ * Create and return a unique transaction id.
+ */
+ public final Long createTx() throws javax.jms.JMSException {
+ Long txId = createPersistentTx();
+ synchronized (postCommitTasks) {
+ postCommitTasks.put(txId, new LinkedList());
+ postRollbackTasks.put(txId, new LinkedList());
}
-
+ return txId;
}
-
+
/**
- * PersistenceManager constructor.
+ * Create and return a unique transaction id.
+ *
+ * Given a distributed connection and a transaction id object,
+ * allocate a unique local transaction id if the remote id is not already
+ * known.
*/
- public PersistenceManager(JMSServer server, XElement configElement) throws
javax.jms.JMSException {
+ public final Long createTx(SpyDistributedConnection dc, Object xid) throws
javax.jms.JMSException {
- try {
+ GlobalXID gxid = new GlobalXID(dc, xid);
+ synchronized(globalToLocal){
+ if( globalToLocal.containsKey(gxid) )
+ throw new JMSException("Duplicate transaction from: "+dc.getClientID()+"
xid="+xid);
+ }
- this.server = server;
- this.configElement = configElement;
+ Long txId = createTx();
+ synchronized(globalToLocal){
+ globalToLocal.put(gxid, txId);
+ }
- URL configFile =
getClass().getClassLoader().getResource("jbossmq.xml");
- dataDirectory = new URL(configFile,
configElement.getField("DataDirectory"));
- URL txLogFile = new URL(dataDirectory, "transactions.dat");
- spyTxLog = new SpyTxLog(txLogFile.getFile());
-
- } catch (Exception e) {
- javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
+ //Tasks to remove the global to local mappings on commit/rollback
+ addPostCommitTask(txId, gxid);
+ addPostRollbackTask(txId, gxid);
- }
+ return txId;
+ }
+
+ /**
+ * Commit the transaction to the persistent store.
+ */
+ public final void commitTx(Long txId) throws javax.jms.JMSException {
+
+ LinkedList tasks;
+ synchronized( postCommitTasks ) {
+ tasks = (LinkedList)postCommitTasks.remove(txId);
+ postRollbackTasks.remove(txId);
+ }
+ if( tasks == null )
+ throw new javax.jms.JMSException("Transaction is not active
for commit.");
-
+ commitPersistentTx(txId);
+ synchronized(tasks){
+ Iterator iter = tasks.iterator();
+ while( iter.hasNext() ) {
+ Runnable task = (Runnable)iter.next();
+ task.run();
+ }
+ }
+ }
-
- public void addPostCommitTask(Long txId, Runnable task) throws
javax.jms.JMSException {
+ public final void addPostCommitTask(Long txId, Runnable task) throws
javax.jms.JMSException {
if( txId == null ) {
task.run();
return;
}
-
+
LinkedList tasks;
synchronized (postCommitTasks) {
tasks = (LinkedList) postCommitTasks.get(txId);
}
if (tasks == null)
throw new javax.jms.JMSException("Transaction is not active.");
-
synchronized (tasks) {
tasks.addLast(task);
}
}
-
-
- public void commitTx(Long txId) throws javax.jms.JMSException {
-
- LinkedList tasks;
- synchronized( postCommitTasks ) {
- tasks = (LinkedList)postCommitTasks.remove(txId);
- postRollbackTasks.remove(txId);
- }
- if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active.");
- spyTxLog.commitTx(txId);
-
- synchronized (tasks) {
- Iterator iter = tasks.iterator();
- while( iter.hasNext() ) {
- Runnable task = (Runnable)iter.next();
- task.run();
- }
- }
-
- }
-
-
- public Long createTx() throws javax.jms.JMSException {
- Long txId = spyTxLog.createTx();
- synchronized (postCommitTasks) {
- postCommitTasks.put(txId, new LinkedList());
- postRollbackTasks.put(txId, new LinkedList());
- }
- return txId;
- }
-
-
-
-
-
-
- public void restore() throws javax.jms.JMSException {
-
- TreeSet commitedTXs = spyTxLog.restore();
- HashMap clone;
- synchronized (messageLogs) {
- clone = (HashMap) messageLogs.clone();
- }
-
- Iterator iter = clone.values().iterator();
- while (iter.hasNext()) {
-
- LogInfo logInfo = (LogInfo)iter.next();
-
- JMSDestination q =
server.getJMSDestination(logInfo.destination);
-
- SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
-
- //TODO: make sure this lock is good enough
- synchronized (q) {
- for (int i = 0; i < rebuild.length; i++) {
- q.restoreMessage(rebuild[i], logInfo.queueId);
- q.messageIdCounter =
Math.max(q.messageIdCounter, rebuild[i].messageId + 1);
- }
- }
- }
-
- }
-
- public void rollbackTx(Long txId) throws javax.jms.JMSException {
+ /**
+ * Rollback the transaction.
+ */
+ public final void rollbackTx(Long txId) throws javax.jms.JMSException {
LinkedList tasks;
synchronized( postCommitTasks ) {
tasks = (LinkedList)postRollbackTasks.remove(txId);
postCommitTasks.remove(txId);
- }
+ }
if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active.");
+ throw new javax.jms.JMSException("Transaction is not active
3.");
- spyTxLog.rollbackTx(txId);
+ rollbackPersistentTx(txId);
- synchronized (tasks) {
- Iterator iter = tasks.iterator();
- while( iter.hasNext() ) {
- Runnable task = (Runnable)iter.next();
- task.run();
- }
- }
-
- }
-
- public void addPostRollbackTask(Long txId, Runnable task) throws
javax.jms.JMSException {
+ synchronized(tasks){
+ Iterator iter = tasks.iterator();
+ while( iter.hasNext() ) {
+ Runnable task = (Runnable)iter.next();
+ task.run();
+ }
+ }
+
+ }
+ public final void addPostRollbackTask(Long txId, Runnable task) throws
javax.jms.JMSException {
+
if( txId == null ) {
return;
}
-
+
LinkedList tasks;
- synchronized( postRollbackTasks ) {
+ synchronized( postCommitTasks ) {
tasks = (LinkedList)postRollbackTasks.get(txId);
}
if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active.");
-
+ throw new javax.jms.JMSException("Transaction is not active
4.");
synchronized (tasks) {
tasks.addLast(task);
}
-
- }
-
- public Long createTx(SpyDistributedConnection dc, Object xid) throws
javax.jms.JMSException {
- GlobalXID gxid = new GlobalXID(dc, xid);
- if( globalToLocal.containsValue(gxid) )
- throw new JMSException("Duplicate transaction from:
"+dc.getClientID()+" xid="+xid);
-
- Long txId = createTx();
- globalToLocal.put(gxid, txId);
-
- //Tasks to remove the global to local mappings on commit/rollback
- addPostCommitTask(txId, gxid);
- addPostRollbackTask(txId, gxid);
-
- return txId;
}
- public Long getPrepared(SpyDistributedConnection dc, Object xid) throws
javax.jms.JMSException {
-
+ /**
+ * Return the local transaction id for a distributed transaction id.
+ */
+ public final Long getPrepared(SpyDistributedConnection dc, Object xid) throws
javax.jms.JMSException {
+
GlobalXID gxid = new GlobalXID(dc, xid);
- Long txid = (Long)globalToLocal.get(gxid);
-
- if( txid == null )
+ Long txid;
+ synchronized(globalToLocal){
+ txid = (Long)globalToLocal.get(gxid);
+ }
+ if( txid == null )
throw new JMSException("Transaction does not exist from:
"+dc.getClientID()+" xid="+xid);
-
- return txid;
- }
-
- public void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
-
- try {
-
- URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
- SpyMessageLog log = new SpyMessageLog(logFile.getFile());
- LogInfo info = new LogInfo(log, dest, queueId);
-
- messageLogs.put(""+dest+"-"+queueId, info);
-
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
-
- }
-
- public void destroyQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
-
- try {
-
- URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
- java.io.File file = new java.io.File(logFile.getFile());
-
- SpyMessageLog log =
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
- if( log == null )
- throw new JMSException("The persistence log was never
initialized");
- log.close();
-
- file.delete();
-
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
-
- }
-
- public void add(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
-
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
-
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
-
- logInfo.log.add(message, txId);
-
- }
-
- public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
-
- LogInfo logInfo;
+ return txid;
+ }
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
+ /**
+ * Restore messages from the persistent queues.
+ */
+ public abstract void restore() throws javax.jms.JMSException;
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+ /**
+ * Create and return a unique transaction id.
+ */
+ public abstract Long createPersistentTx() throws javax.jms.JMSException;
- logInfo.log.remove(message, txId);
+ /**
+ * Commit the transaction to the persistent store.
+ */
+ public abstract void commitPersistentTx(Long txId) throws
javax.jms.JMSException;
+
+ /**
+ * Rollback the transaction.
+ */
+ public abstract void rollbackPersistentTx(Long txId) throws
javax.jms.JMSException;
+
+ /**
+ * Initialize the queue.
+ */
+ public abstract void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException;
+
+ /**
+ * Remove the queue, and all messages in it, from the persistent store
+ */
+ public abstract void destroyQueue( SpyDestination dest, String queueId )
throws javax.jms.JMSException;
+
+ /**
+ * Remove message from the persistent store.
+ * If the message is part of a transaction, txId is not null.
+ */
+ public abstract void add(String queueId, SpyMessage message, Long txId) throws
javax.jms.JMSException;
+
+ /**
+ * Remove message from the persistent store.
+ * If the message is part of a transaction, txId is not null.
+ */
+ public abstract void remove(String queueId, SpyMessage message, Long txId)
throws javax.jms.JMSException;
- }
}
1.3 +58 -53 jbossmq/src/main/org/jbossmq/server/JMSDestination.java
Index: JMSDestination.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/JMSDestination.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JMSDestination.java 2001/03/02 01:13:02 1.2
+++ JMSDestination.java 2001/05/15 07:16:48 1.3
@@ -23,13 +23,13 @@
import org.jbossmq.persistence.SpyMessageLog;
/**
- * This class is a message queue which is stored (hashed by Destination) on the
+ * This class is a message queue which is stored (hashed by Destination) on the
* JMS provider
- *
+ *
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.2 $
+ *
+ * @version $Revision: 1.3 $
*/
public class JMSDestination {
@@ -41,25 +41,25 @@
ClientConsumer temporaryDestination;
//The JMSServer object
JMSServer server;
- //Am I a queue or a topic
+ //Am I a queue or a topic
boolean isTopic;
//Counter used to number incomming messages. (Used to order the messages.)
long messageIdCounter = Long.MIN_VALUE;
- //Hashmap of ExclusiveQueues
+ //Hashmap of ExclusiveQueues
HashMap exclusiveQueues = new HashMap();
//ShareQueue used for topics
SharedQueue sharedQueue;
- // Constructor ---------------------------------------------------
+ // Constructor ---------------------------------------------------
JMSDestination(SpyDestination dest,ClientConsumer temporary,JMSServer server)
throws JMSException
{
destination=dest;
temporaryDestination=temporary;
this.server=server;
isTopic=dest instanceof SpyTopic;
-
+
sharedQueue = new SharedQueue(server);
-
+
if( !isTopic ) {
exclusiveQueues.put(DEFAULT_QUEUE_ID, new
ExclusiveQueue(server));
@@ -68,12 +68,12 @@
server.persistenceManager.initQueue(dest,
DEFAULT_QUEUE_ID);
}
}
-
+
}
public void addMessage(SpyMessage mes, Long txId) throws JMSException
{
-
+
Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT &&
@@ -81,59 +81,61 @@
throw new JMSException("Cannot write a persistent message to a
temporary destination!");
}
-
+
//Number the message so that we can preserve order of delivery.
- mes.messageId = messageIdCounter++;
+ synchronized(this) {
+ mes.messageId = messageIdCounter++;
+ }
if( isTopic ) {
-
+
sharedQueue.addMessage(mes, txId);
-
+
synchronized (exclusiveQueues) {
-
+
if( exclusiveQueues.size() == 0 )
return;
-
+
Iterator iter = exclusiveQueues.keySet().iterator();
while( iter.hasNext() ) {
-
+
String queueId = (String)iter.next();
ExclusiveQueue eq =
(ExclusiveQueue)exclusiveQueues.get(queueId);
- if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT )
+ if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT )
server.persistenceManager.add(queueId,
mes, txId);
-
+
eq.addMessage(mes, txId);
-
+
}
}
-
+
} else {
-
+
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get(
DEFAULT_QUEUE_ID );
- if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
+ if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
server.persistenceManager.add(DEFAULT_QUEUE_ID, mes,
txId);
-
+
eq.addMessage(mes, txId);
-
+
}
-
+
}
-
- // Package protected ---------------------------------------------
+
+ // Package protected ---------------------------------------------
void addExclusiveConsumer(String queue, ClientConsumer c) throws JMSException {
Log.log(""+this+"->addExclusiveConsumer(queue="+queue+",
consumer="+c+")");
-
+
ExclusiveQueue eq = getExclusiveQueue( queue );
if( eq == null )
throw new JMSException("That destination queue does not
exist");
-
+
eq.addConsumer(c);
}
- // Package protected ---------------------------------------------
+ // Package protected ---------------------------------------------
void addSharedConsumer(ClientConsumer c) throws JMSException {
Log.log(""+this+"->addSharedConsumer(consumer="+c+")");
sharedQueue.addConsumer(c);
@@ -145,16 +147,16 @@
return eq.browse( selector );
}
- // Package protected ---------------------------------------------
+ // Package protected ---------------------------------------------
ExclusiveQueue getExclusiveQueue(String queue) {
-
+
synchronized (exclusiveQueues) {
- return (ExclusiveQueue)exclusiveQueues.get( queue );
+ return (ExclusiveQueue)exclusiveQueues.get( queue );
}
-
+
}
- // Package protected ---------------------------------------------
+ // Package protected ---------------------------------------------
void removeConsumerFromAll(ClientConsumer c) throws JMSException {
Log.log(""+this+"->removeConsumerFromAll(consumer="+c+")");
@@ -167,33 +169,36 @@
eq.removeConsumer(c);
}
}
-
+
}
- // Package protected ---------------------------------------------
+ // Package protected ---------------------------------------------
void removeExclusiveConsumer(String queue, ClientConsumer c) throws
JMSException {
Log.log(""+this+"->removeExclusiveConsumer(queue="+queue+",
consumer="+c+")");
-
+
ExclusiveQueue eq = getExclusiveQueue( queue );
if( eq == null )
throw new JMSException("That destination queue does not
exist");
-
+
eq.removeConsumer(c);
}
- // Package protected ---------------------------------------------
+ // Package protected ---------------------------------------------
void removeSharedConsumer(ClientConsumer c) throws JMSException {
Log.log(""+this+"->removeSharedConsumer(consumer="+c+")");
sharedQueue.removeConsumer(c);
}
//Used to put a message that was added previously to the queue, back in the
queue
- public void restoreMessage(SpyMessage mes, String queueId)
+ public void restoreMessage(SpyMessage mes, String queueId)
{
Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
- ExclusiveQueue eq = getExclusiveQueue(queueId);
- eq.restoreMessage(mes);
+ synchronized(this) {
+ messageIdCounter = Math.max(messageIdCounter, mes.messageId+1);
+ }
+ ExclusiveQueue eq = getExclusiveQueue(queueId);
+ eq.restoreMessage(mes);
}
public String toString() {
@@ -202,33 +207,33 @@
public void createDurableSubscription(String clientId, String
subscriptionName) throws JMSException
{
- if( !isTopic )
+ if( !isTopic )
throw new JMSException("Not a valid operation on a Queue");
String queueId =
durableSubscriptionToQueueId(clientId,subscriptionName);
-
+
synchronized (exclusiveQueues) {
exclusiveQueues.put(queueId, new ExclusiveQueue(server));
}
-
+
server.persistenceManager.initQueue(destination, queueId);
-
+
}
public void destoryDurableSubscription(String clientId, String
subscriptionName) throws JMSException
{
- if( !isTopic )
+ if( !isTopic )
throw new JMSException("Not a valid operation on a Queue");
String queueId =
durableSubscriptionToQueueId(clientId,subscriptionName);
- synchronized (exclusiveQueues) {
+ synchronized (exclusiveQueues) {
exclusiveQueues.remove(queueId);
}
server.persistenceManager.destroyQueue(destination, queueId);
-
+
}
- static public String durableSubscriptionToQueueId(String clientId, String
subscriptionName)
+ static public String durableSubscriptionToQueueId(String clientId, String
subscriptionName)
{
return clientId+"-"+subscriptionName;
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development