User: chirino
Date: 01/07/27 17:33:38
Modified: src/main/org/jbossmq/pm/jdbc MessageLog.java
PersistenceManager.java
PersistenceManagerMBean.java TxLog.java
Log:
Once again many changes.
- The logic that handled the processing of queue and topic messages
was seperated our more to make it easier to follow.
- A QueuedTask class was created to avoid unneeded processing of queues.
- The interface between the client-server-queues-peristence manager to handel
DurableSubscription was too verbose, created a DurableSubscripton class and now
SpyTopics can be inspected to see if they are being used as a DurableSubscription
- The MBeans that add queues and topics makes it simpler to configure a queue/topic.
Revision Changes Path
1.2 +13 -12 jbossmq/src/main/org/jbossmq/pm/jdbc/MessageLog.java
Index: MessageLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/MessageLog.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MessageLog.java 2001/07/11 02:52:16 1.1
+++ MessageLog.java 2001/07/28 00:33:38 1.2
@@ -27,7 +27,7 @@
* queue in case of provider failure.
*
* @author: Jayesh Parayali ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class MessageLog {
@@ -37,20 +37,14 @@
//private File queueName;
protected static DataSource datasource;
- /////////////////////////////////////////////////////////////////////
- // Constructor
- /////////////////////////////////////////////////////////////////////
- public MessageLog(DataSource datasource, String dest, String queueId) throws
JMSException {
- if (this.datasource == null)
- this.datasource = datasource;
- }
+
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
public void close() throws JMSException {
- }
+ }
public void add( SpyMessage message, Long transactionId ) throws JMSException {
PreparedStatement pstmt = null;
@@ -95,7 +89,7 @@
}
}
- }
+ }
public void remove( SpyMessage message, Long transactionId ) throws JMSException {
PreparedStatement pstmt = null;
@@ -128,7 +122,7 @@
}
}
- }
+ }
public SpyMessage[] restore(java.util.TreeSet comittingTXs, String dest) throws
JMSException {
String destin = dest.substring(21,dest.length());
@@ -188,12 +182,19 @@
for( int i=0; iter.hasNext(); i++ )
rc[i] = (SpyMessage)iter.next();
return rc;
- }
+ }
private void throwJMSException(String message, Exception e) throws JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
- }
+ }
+ /////////////////////////////////////////////////////////////////////
+ // Constructor
+ /////////////////////////////////////////////////////////////////////
+ public MessageLog(DataSource datasource, String dest) throws JMSException {
+ if (this.datasource == null)
+ this.datasource = datasource;
+ }
}
1.2 +111 -105 jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PersistenceManager.java 2001/07/11 02:52:16 1.1
+++ PersistenceManager.java 2001/07/28 00:33:38 1.2
@@ -40,7 +40,7 @@
*
* @author: Jayesh Parayali ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class PersistenceManager extends ServiceMBeanSupport
implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean,
MBeanRegistration {
@@ -62,12 +62,10 @@
static class LogInfo {
MessageLog log;
SpyDestination destination;
- String queueId;
- LogInfo(MessageLog log, SpyDestination destination, String queueId) {
+ LogInfo(MessageLog log, SpyDestination destination) {
this.log=log;
this.destination=destination;
- this.queueId=queueId;
}
}
@@ -81,7 +79,7 @@
transactedTasks.put(txId, new LinkedList());
}
return txId;
- }
+ }
public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
@@ -98,7 +96,7 @@
}
txLog.commitTx(txId);
- }
+ }
public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
@@ -115,108 +113,15 @@
}
txLog.rollbackTx(txId);
- }
-
- public void initQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
- try {
- //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
-
- MessageLog log = new MessageLog(datasource, dest.toString(), queueId);
-
- LogInfo info = new LogInfo(log, dest, queueId);
-
- synchronized(messageLogs){
- messageLogs.put(""+dest+"-"+queueId, info);
- }
-
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE = new javax.jms.JMSException("Invalid
configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
-
- }
-
- public void destroyQueue( SpyDestination dest, String queueId ) throws
javax.jms.JMSException {
-
- try {
-
- //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
- //java.io.File file = new java.io.File(logDir.getFile());
-
- LogInfo logInfo;
- synchronized(messageLogs){
- logInfo = (LogInfo)messageLogs.remove(""+dest+"-"+queueId);
- }
- if( logInfo == null )
- throw new JMSException("The persistence log was never initialized");
-
- logInfo.log.close();
- //file.delete();
-
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE = new javax.jms.JMSException("Invalid
configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
-
- }
-
- public void add(String queueId, org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
-
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not initalized with the
PersistenceManager");
-
- logInfo.log.add(message, txId);
-
- if( txId != null ) {
- LinkedList tasks;
- synchronized( transactedTasks ) {
- tasks = (LinkedList)transactedTasks.get(txId);
- }
- if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active 5.");
- synchronized(tasks){
- tasks.addLast(new Transaction(true, logInfo, message, txId));
- }
- }
-
- }
-
- public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
- LogInfo logInfo;
+ }
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
+
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not initalized with the
PersistenceManager");
+
- if( txId == null )
- logInfo.log.remove(message, txId);
- else {
- LinkedList tasks;
- synchronized (transactedTasks) {
- tasks = (LinkedList)transactedTasks.get(txId);
- }
- if( tasks == null )
- throw new javax.jms.JMSException("Transaction is not active 6.");
- synchronized(tasks){
- tasks.addLast(new Transaction(false, logInfo, message, txId));
- }
- }
+
- }
+
class Transaction {
private LogInfo logInfo;
@@ -294,12 +199,12 @@
//TODO: make sure this lock is good enough
synchronized (q) {
for (int i = 0; i < rebuild.length; i++) {
- q.restoreMessage(rebuild[i], logInfo.queueId);
+ q.restoreMessage(rebuild[i]);
}
}
}
- }
+ }
/**
* Insert the method's description here.
@@ -316,4 +221,105 @@
restore(server);
}
+
+ public void add(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo = (LogInfo) messageLogs.get(""+message.getJMSDestination());
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not initalized with the
PersistenceManager");
+
+ logInfo.log.add(message, txId);
+
+ if( txId != null ) {
+ LinkedList tasks;
+ synchronized( transactedTasks ) {
+ tasks = (LinkedList)transactedTasks.get(txId);
+ }
+ if( tasks == null )
+ throw new javax.jms.JMSException("Transaction is not active 5.");
+ synchronized(tasks){
+ tasks.addLast(new Transaction(true, logInfo, message, txId));
+ }
+ }
+
+ }
+
+ public void destroyQueue( SpyDestination dest ) throws javax.jms.JMSException {
+
+ try {
+
+ //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
+ //java.io.File file = new java.io.File(logDir.getFile());
+
+ LogInfo logInfo;
+ synchronized(messageLogs){
+ logInfo = (LogInfo)messageLogs.remove(""+dest);
+ }
+ if( logInfo == null )
+ throw new JMSException("The persistence log was never initialized");
+
+ logInfo.log.close();
+ //file.delete();
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE = new javax.jms.JMSException("Invalid
configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ }
+
+ public void initQueue( SpyDestination dest) throws javax.jms.JMSException {
+ try {
+ //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
+
+ MessageLog log = new MessageLog(datasource, dest.toString());
+
+ LogInfo info = new LogInfo(log, dest);
+
+ synchronized(messageLogs){
+ messageLogs.put(""+dest, info);
+ }
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE = new javax.jms.JMSException("Invalid
configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ }
+
+ public void remove(org.jbossmq.SpyMessage message, Long txId) throws
javax.jms.JMSException {
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo = (LogInfo) messageLogs.get(""+message.getJMSDestination());
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not initalized with the
PersistenceManager");
+
+ if( txId == null )
+ logInfo.log.remove(message, txId);
+ else {
+ LinkedList tasks;
+ synchronized (transactedTasks) {
+ tasks = (LinkedList)transactedTasks.get(txId);
+ }
+ if( tasks == null )
+ throw new javax.jms.JMSException("Transaction is not active 6.");
+ synchronized(tasks){
+ tasks.addLast(new Transaction(false, logInfo, message, txId));
+ }
+ }
+
+ }
}
1.3 +1 -1
jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManagerMBean.java
Index: PersistenceManagerMBean.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManagerMBean.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PersistenceManagerMBean.java 2001/07/16 02:51:46 1.2
+++ PersistenceManagerMBean.java 2001/07/28 00:33:38 1.3
@@ -41,7 +41,7 @@
*
* @see <related>
* @author Vincent Sheffer ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public interface PersistenceManagerMBean
extends org.jboss.util.ServiceMBean
@@ -61,4 +61,4 @@
// Public --------------------------------------------------------
public java.lang.String getJmsDBPoolName();
public void setJmsDBPoolName(java.lang.String newJmsDBPoolName);
-}
+}
\ No newline at end of file
1.2 +7 -7 jbossmq/src/main/org/jbossmq/pm/jdbc/TxLog.java
Index: TxLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/TxLog.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- TxLog.java 2001/07/11 02:52:16 1.1
+++ TxLog.java 2001/07/28 00:33:38 1.2
@@ -22,7 +22,7 @@
* It is used to rollback transactions when the system restarts.
*
* @author: Jayesh Parayali ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class TxLog {
@@ -38,11 +38,11 @@
public TxLog(DataSource datasource) throws JMSException {
if (ds == null)
ds = datasource;
- }
+ }
private final Connection getConnection() throws SQLException {
return ds.getConnection();
- }
+ }
synchronized public Long createTx() throws JMSException {
Long id = new Long(nextTransactionId++);
@@ -74,7 +74,7 @@
throwJMSException("Could not close database connection in transaction
log (createTx).",e);
}
return id;
- }
+ }
synchronized public void commitTx(Long txId) throws JMSException {
Connection con = null;
@@ -104,7 +104,7 @@
catch(SQLException e) {
throwJMSException("Could not close database connection in transaction
log (commitTx)",e);
}
- }
+ }
synchronized public void rollbackTx(Long txId) throws JMSException {
Connection con = null;
@@ -134,7 +134,7 @@
catch(SQLException e) {
throwJMSException("Could not close database connection in transaction
log (rollbackTx)",e);
}
- }
+ }
synchronized public java.util.TreeSet restore() throws JMSException {
TreeSet items = new TreeSet();;
@@ -167,7 +167,7 @@
}
return items;
- }
+ }
/////////////////////////////////////////////////////////////////////
// Private Methods
@@ -176,6 +176,6 @@
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
- }
+ }
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development