User: pkendall
Date: 01/05/15 00:15:45
Added: src/main/org/jbossmq/persistence PersistenceManager.java
Log:
Taken & modified from server package to form new persistence logfile based
persistence package.
Revision Changes Path
1.1 jbossmq/src/main/org/jbossmq/persistence/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.persistence;
import javax.jms.JMSException;
import java.net.URL;
import java.util.HashMap;
import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
import org.jbossmq.xml.XElement;
import org.jbossmq.server.JMSServer;
import org.jbossmq.server.JMSDestination;
import org.jbossmq.SpyMessage;
import org.jbossmq.SpyDestination;
import org.jbossmq.SpyDistributedConnection;
/**
* This class manages all persistence related services.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class PersistenceManager extends org.jbossmq.server.PersistenceManager {
// The server this persistence manager is providing service for
JMSServer server;
// The configuration data for the manager.
XElement configElement;
// The directory where persistence data should be stored
URL dataDirectory;
// Log file used to store commited transactions.
SpyTxLog spyTxLog;
// Maps SpyDestinations to SpyMessageLogs
HashMap messageLogs = new HashMap();
static class LogInfo {
SpyMessageLog log;
SpyDestination destination;
String queueId;
LogInfo(SpyMessageLog log, SpyDestination destination, String queueId)
{
this.log=log;
this.destination=destination;
this.queueId=queueId;
}
}
/**
* PersistenceManager constructor.
*/
public PersistenceManager(JMSServer server, XElement configElement) throws
javax.jms.JMSException {
try {
this.server = server;
this.configElement = configElement;
URL configFile =
getClass().getClassLoader().getResource("jbossmq.xml");
dataDirectory = new URL(configFile,
configElement.getField("DataDirectory"));
URL txLogFile = new URL(dataDirectory, "transactions.dat");
spyTxLog = new SpyTxLog(txLogFile.getFile());
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public Long createPersistentTx() throws javax.jms.JMSException {
return spyTxLog.createTx();
}
public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
spyTxLog.commitTx(txId);
}
public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
spyTxLog.rollbackTx(txId);
}
public void restore() throws javax.jms.JMSException {
TreeSet commitedTXs = spyTxLog.restore();
HashMap clone;
synchronized (messageLogs) {
clone = (HashMap) messageLogs.clone();
}
Iterator iter = clone.values().iterator();
while (iter.hasNext()) {
LogInfo logInfo = (LogInfo)iter.next();
JMSDestination q =
server.getJMSDestination(logInfo.destination);
SpyMessage rebuild[] = logInfo.log.restore(commitedTXs);
//TODO: make sure this lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
q.restoreMessage(rebuild[i], logInfo.queueId);
}
}
}
}
public void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
try {
URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
SpyMessageLog log = new SpyMessageLog(logFile.getFile());
LogInfo info = new LogInfo(log, dest, queueId);
messageLogs.put(""+dest+"-"+queueId, info);
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void destroyQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
try {
URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
java.io.File file = new java.io.File(logFile.getFile());
SpyMessageLog log =
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
if( log == null )
throw new JMSException("The persistence log was never
initialized");
log.close();
file.delete();
} catch (javax.jms.JMSException e) {
throw e;
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
newE.setLinkedException(e);
throw newE;
}
}
public void add(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
LogInfo logInfo;
synchronized (messageLogs) {
logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
logInfo.log.add(message, txId);
}
public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
LogInfo logInfo;
synchronized (messageLogs) {
logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
logInfo.log.remove(message, txId);
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development