User: chirino
Date: 01/07/27 17:33:38
Modified: src/main/org/jbossmq/pm/file PersistenceManager.java
PersistenceManagerMBean.java
Log:
Once again many changes.
- The logic that handled the processing of queue and topic messages
was seperated our more to make it easier to follow.
- A QueuedTask class was created to avoid unneeded processing of queues.
- The interface between the client-server-queues-peristence manager to handel
DurableSubscription was too verbose, created a DurableSubscripton class and now
SpyTopics can be inspected to see if they are being used as a DurableSubscription
- The MBeans that add queues and topics makes it simpler to configure a queue/topic.
Revision Changes Path
1.2 +109 -100 jbossmq/src/main/org/jbossmq/pm/file/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/file/PersistenceManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PersistenceManager.java 2001/07/11 02:52:16 1.1
+++ PersistenceManager.java 2001/07/28 00:33:38 1.2
@@ -34,7 +34,7 @@
*
* @author Paul Kendall ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class PersistenceManager extends ServiceMBeanSupport implements
org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, MBeanRegistration,
Serializable {
@@ -51,12 +51,10 @@
static class LogInfo {
MessageLog log;
SpyDestination destination;
- String queueId;
- LogInfo(MessageLog log, SpyDestination destination, String queueId) {
+ LogInfo(MessageLog log, SpyDestination destination) {
this.log= log;
this.destination= destination;
- this.queueId= queueId;
}
}
@@ -106,109 +104,13 @@
txLog.rollbackTx(txId);
}
- public void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
- try {
-
- URL logDir = new URL(dataDirURL, dest.toString()+"-"+queueId);
- MessageLog log = new MessageLog(logDir.getFile());
-
- LogInfo info = new LogInfo(log, dest, queueId);
-
- synchronized(messageLogs){
- messageLogs.put(""+dest+"-"+queueId, info);
- }
-
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
-
- }
-
- public void destroyQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
-
- try {
-
- URL logDir = new URL(dataDirURL, dest.toString()+"-"+queueId);
- java.io.File file = new java.io.File(logDir.getFile());
- LogInfo logInfo;
- synchronized(messageLogs){
- logInfo = (LogInfo)messageLogs.remove(""+dest+"-"+queueId);
- }
- if( logInfo == null )
- throw new JMSException("The persistence log was never
initialized");
- logInfo.log.close();
- file.delete();
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
-
- }
- public void add(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
-
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
-
- logInfo.log.add(message, txId);
-
- if( txId != null ) {
- LinkedList tasks;
- synchronized( transactedTasks ) {
- tasks = (LinkedList)transactedTasks.get(txId);
- }
- if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active 5.");
- synchronized(tasks){
- tasks.addLast(new Transaction(true, logInfo, message, txId));
- }
- }
-
- }
-
- public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
-
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
-
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
-
- if( txId == null )
- logInfo.log.remove(message, txId);
- else {
- LinkedList tasks;
- synchronized (transactedTasks) {
- tasks = (LinkedList)transactedTasks.get(txId);
- }
- if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active 6.");
- synchronized(tasks){
- tasks.addLast(new Transaction(false, logInfo, message, txId));
- }
- }
- }
class Transaction {
private LogInfo logInfo;
@@ -294,7 +196,7 @@
//TODO: make sure this lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
- q.restoreMessage(rebuild[i], logInfo.queueId);
+ q.restoreMessage(rebuild[i]);
}
}
}
@@ -315,5 +217,112 @@
JMSServer server = (JMSServer)getServer().invoke(new
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {}, new String[] {} );
restore(server);
+ }
+
+ public void add(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination());
+ }
+
+ if (logInfo == null) {
+ category.debug("Destination was not initialized : "+
message.getJMSDestination());
+ throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+ }
+
+ logInfo.log.add(message, txId);
+
+ if( txId != null ) {
+ LinkedList tasks;
+ synchronized( transactedTasks ) {
+ tasks = (LinkedList)transactedTasks.get(txId);
+ }
+ if( tasks == null )
+ throw new javax.jms.JMSException("Transaction is not active 5.");
+ synchronized(tasks){
+ tasks.addLast(new Transaction(true, logInfo, message, txId));
+ }
+ }
+
+ }
+
+ public void destroyQueue( SpyDestination dest) throws javax.jms.JMSException {
+
+ try {
+
+ URL logDir = new URL(dataDirURL, dest.toString());
+ java.io.File file = new java.io.File(logDir.getFile());
+
+ LogInfo logInfo;
+ synchronized(messageLogs){
+ logInfo = (LogInfo)messageLogs.remove(""+dest);
+ }
+ if( logInfo == null )
+ throw new JMSException("The persistence log was never
initialized");
+
+ logInfo.log.close();
+ file.delete();
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ }
+
+ public void initQueue(SpyDestination dest) throws javax.jms.JMSException {
+
+ try {
+
+ URL logDir= new URL(dataDirURL, dest.toString());
+ MessageLog log= new MessageLog(logDir.getFile());
+
+ LogInfo info= new LogInfo(log, dest);
+
+ category.debug("Initializing persistence for destination: "+ dest);
+ synchronized (messageLogs) {
+ messageLogs.put("" + dest, info);
+ }
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE= new javax.jms.JMSException("Invalid
configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ }
+
+ public void remove(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination());
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+
+ if( txId == null )
+ logInfo.log.remove(message, txId);
+ else {
+ LinkedList tasks;
+ synchronized (transactedTasks) {
+ tasks = (LinkedList)transactedTasks.get(txId);
+ }
+ if( tasks == null )
+ throw new javax.jms.JMSException("Transaction is not active 6.");
+ synchronized(tasks){
+ tasks.addLast(new Transaction(false, logInfo, message, txId));
+ }
+ }
+
}
}
1.3 +1 -1
jbossmq/src/main/org/jbossmq/pm/file/PersistenceManagerMBean.java
Index: PersistenceManagerMBean.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/file/PersistenceManagerMBean.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PersistenceManagerMBean.java 2001/07/16 02:51:46 1.2
+++ PersistenceManagerMBean.java 2001/07/28 00:33:38 1.3
@@ -41,11 +41,11 @@
*
* @see <related>
* @author Vincent Sheffer ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public interface PersistenceManagerMBean
extends org.jboss.util.ServiceMBean
{
public java.lang.String getDataDirectory();
public void setDataDirectory(java.lang.String newDataDirectory);
-}
+}
\ No newline at end of file
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development