User: d_jencks
Date: 01/11/10 13:38:05
Modified: src/main/org/jboss/mq/pm/jdbc MessageLog.java
PersistenceManager.java
PersistenceManagerMBean.java TxLog.java
Log:
Changed mbean dependencies to work directly by mbean-references: eliminated depends
tag from *service.xml files
Revision Changes Path
1.6 +258 -141 jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java
Index: MessageLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- MessageLog.java 2001/10/28 04:07:34 1.5
+++ MessageLog.java 2001/11/10 21:38:05 1.6
@@ -6,184 +6,301 @@
*/
package org.jboss.mq.pm.jdbc;
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.FileOutputStream;
-import java.io.ObjectOutputStream;
+
+
+import java.io.*;
+import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
-import java.io.File;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.sql.*;
-import java.io.*;
-
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import javax.jms.JMSException;
import javax.sql.*;
-
import org.jboss.mq.SpyDestination;
-import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyJMSException;
+import org.jboss.mq.SpyMessage;
+import org.jboss.mq.server.MessageCache;
+import org.jboss.mq.server.MessageReference;
/**
* This is used to keep SpyMessages on the disk and is used reconstruct the
* queue in case of provider failure.
*
* @author: Jayesh Parayali ([EMAIL PROTECTED])
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class MessageLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
- //private File queueName;
- protected DataSource datasource;
+ //maybe this will work with hypersonic??
+ private static boolean SUPPORTS_OBJECTS = true;
+
+ private final DataSource datasource;
+
+ private final MessageCache messageCache;
+
+ MessageLog(MessageCache messageCache, javax.sql.DataSource datasource)
+ throws JMSException
+ {
+ if (messageCache == null)
+ {
+ throw new IllegalArgumentException("Need a MessageCache to construct a
MessageLog!");
+ } // end of if ()
+
+ if (datasource == null)
+ {
+ throw new IllegalArgumentException("Need a datasource to construct a
MessageLog!");
+ } // end of if ()
+
+ this.messageCache = messageCache;
+ this.datasource = datasource;
+ try
+ {
+ Connection c = datasource.getConnection();
+ try
+ {
+ ResultSet rs = c.getMetaData().getTables(null, null, "jms_messages",
null);
+ if (!rs.next())
+ {
+ Statement s = c.createStatement();
+ try
+ {
+ s.executeUpdate("create table jms_messages (destination
varchar(32), messageblob object, messageid varchar(32))");
+
+
+ }
+ finally
+ {
+ s.close();
+ } // end of try-catch
+ } // end of if ()
+ rs.close();
+ }
+ finally
+ {
+ c.close();
+ } // end of try-catch
+ }
+ catch (SQLException e)
+ {
+ throwJMSException("could not find or set up message table", e);
+ } // end of try-catch
+
+ }
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
public void close() throws JMSException {
}
-
- public SpyMessage[] restore(java.util.TreeSet comittingTXs, String dest) throws
JMSException {
- String destin= dest.substring(21, dest.length());
- java.util.TreeMap messageIndex= new java.util.TreeMap();
- PreparedStatement pstmt= null;
- ResultSet rs= null;
- Connection con= null;
-
- try {
- con= datasource.getConnection();
- pstmt= con.prepareStatement("select messageblob, messageid from
jms_messages where destination = ?");
- pstmt.setString(1, destin);
- rs= pstmt.executeQuery();
-
- while (rs.next()) {
- byte[] st= (byte[]) rs.getObject(1);
- ByteArrayInputStream baip= new ByteArrayInputStream(st);
- ObjectInputStream ois= new ObjectInputStream(baip);
- // re-create the object
- SpyMessage message= (SpyMessage) ois.readObject();
-
- //Long msgId = new
Long(Long.parseLong(rs.getString(2).trim(),16));
- //restore the messageId which is not persistent.
- message.header.messageId=
Long.parseLong(rs.getString(2).trim(), 16);
- Long msgId= new Long(message.header.messageId);
- messageIndex.put(msgId, message);
- }
- } catch (SQLException e) {
- throwJMSException("SQL error while rebuilding the tranaction log.",
e);
- } catch (Exception e) {
- throwJMSException("Could not rebuild the queue from the queue's
tranaction log.", e);
- } finally {
- try {
- if (rs != null)
- rs.close();
- if (pstmt != null)
- pstmt.close();
- if (con != null)
- con.close();
- } catch (SQLException e) {
- throwJMSException("SQL error while closing the database
connection", e);
- }
- }
-
- SpyMessage rc[]= new SpyMessage[messageIndex.size()];
- java.util.Iterator iter= messageIndex.values().iterator();
- for (int i= 0; iter.hasNext(); i++)
- rc[i]= (SpyMessage) iter.next();
- return rc;
+ public Map restoreAll() throws JMSException {
+ //WTF is 21???
+ //String destin= dest.substring(21, dest.length());
+
+ Map unrestoredMessages = new HashMap();
+
+ //TreeMap messageIndex= new TreeMap();
+ PreparedStatement pstmt= null;
+ ResultSet rs= null;
+ Connection con= null;
+
+ try
+ {
+ try
+ {
+ con= datasource.getConnection();
+ try
+ {
+
+ pstmt= con.prepareStatement("select destination, messageblob,
messageid from jms_messages");
+ //pstmt.setString(1, destin);
+ try
+ {
+ rs= pstmt.executeQuery();
+ while (rs.next())
+ {
+ String dest = rs.getString(1);
+ SpyMessage message = null;
+ if (SUPPORTS_OBJECTS)
+ {
+ message = (SpyMessage)rs.getObject(2);
+ } // end of if ()
+ else
+ {
+ byte[] st= (byte[]) rs.getObject(2);
+ ByteArrayInputStream baip= new ByteArrayInputStream(st);
+ ObjectInputStream ois= new ObjectInputStream(baip);
+ // re-create the object
+ message = (SpyMessage) ois.readObject();
+ } // end of else
+
+ //restore the messageId which is not persistent.
+ //ID stored in hexadecimal string!!
+ message.header.messageId=
Long.parseLong(rs.getString(3).trim(), 16);
+ Long msgId= new Long(message.header.messageId);
+ MessageReference mr = messageCache.add(message);
+ Map messageIndex = (Map)unrestoredMessages.get(dest);
+ if (messageIndex == null)
+ {
+ messageIndex = new TreeMap();
+ unrestoredMessages.put(dest, messageIndex);
+ } // end of if ()
+
+ messageIndex.put(msgId, mr);
+ }
+ }
+ finally
+ {
+ rs.close();
+ } // end of finally
+ }
+ finally
+ {
+ pstmt.close();
+ } // end of finally
+ }
+ finally
+ {
+ con.close();
+ } // end of finally
+
+ }
+ catch (SQLException e)
+ {
+ throwJMSException("SQL error while rebuilding the tranaction log.", e);
+ }
+ catch (Exception e)
+ {
+ throwJMSException("Could not rebuild the queue from the queue's tranaction
log.", e);
+ }
+ return unrestoredMessages;
}
private void throwJMSException(String message, Exception e) throws JMSException {
- JMSException newE= new SpyJMSException(message);
- newE.setLinkedException(e);
- throw newE;
+ JMSException newE= new SpyJMSException(message);
+ newE.setLinkedException(e);
+ throw newE;
}
- /////////////////////////////////////////////////////////////////////
- // Constructor
- /////////////////////////////////////////////////////////////////////
- public MessageLog(javax.sql.DataSource datasource, String dest) throws
JMSException {
- this.datasource = datasource;
- }
-
- public void add(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws
JMSException {
- PreparedStatement pstmt= null;
- Connection con= null;
-
- try {
- con= datasource.getConnection();
- ByteArrayOutputStream baos= new ByteArrayOutputStream();
- ObjectOutputStream oos= new ObjectOutputStream(baos);
- oos.writeObject(message);
- byte[] messageAsBytes= baos.toByteArray();
- pstmt= con.prepareStatement("insert into jms_messages (messageid,
destination, messageblob) VALUES(?,?,?)");
- ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
- pstmt.setString(2, ((SpyDestination)
message.getJMSDestination()).getName());
- pstmt.setBinaryStream(3, bais, messageAsBytes.length);
- String hexString= null;
- if (message.header.messageId <= 0)
- hexString= "-" + Long.toHexString((-1) *
message.header.messageId);
- else
- hexString= Long.toHexString(message.header.messageId);
-
- pstmt.setString(1, hexString);
- pstmt.executeUpdate();
-
- pstmt.close();
- } catch (IOException e) {
- throwJMSException("Could serialize the message.", e);
- } catch (SQLException e) {
- throwJMSException("Could not write message to the database.", e);
- } finally {
- try {
- //if (pstmt != null)
- //pstmt.close();
- if (con != null)
- con.close();
- } catch (SQLException e) {
- throwJMSException("Could not close the database.", e);
- }
+ public void add(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws
JMSException
+ {
+ PreparedStatement pstmt= null;
+ Connection con= null;
+
+ try
+ {
+ con= datasource.getConnection();
+ pstmt= con.prepareStatement("insert into jms_messages (messageid,
destination, messageblob) VALUES(?,?,?)");
+ String hexString= null;
+ if (message.header.messageId <= 0)
+ {
+ hexString= "-" + Long.toHexString((-1) * message.header.messageId);
+ }
+ else
+ {
+ hexString= Long.toHexString(message.header.messageId);
+ } // end of else
+ pstmt.setString(1, hexString);
+ pstmt.setString(2, ((SpyDestination)
message.getJMSDestination()).getName());
+ if (SUPPORTS_OBJECTS)
+ {
+ pstmt.setObject(3, message);
+ } // end of if ()
+ else
+ {
+ ByteArrayOutputStream baos= new ByteArrayOutputStream();
+ ObjectOutputStream oos= new ObjectOutputStream(baos);
+ oos.writeObject(message);
+ byte[] messageAsBytes= baos.toByteArray();
+ ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
+ pstmt.setBinaryStream(3, bais, messageAsBytes.length);
+ } // end of else
+
+ pstmt.executeUpdate();
+
+ pstmt.close();
+ }
+ catch (IOException e)
+ {
+ throwJMSException("Could serialize the message.", e);
+ }
+ catch (SQLException e)
+ {
+ throwJMSException("Could not write message to the database.", e);
+ }
+ finally
+ {
+ try
+ {
+ //if (pstmt != null)
+ //pstmt.close();
+ if (con != null)
+ con.close();
+ }
+ catch (SQLException e)
+ {
+ throwJMSException("Could not close the database.", e);
+ }
- }
+ }
}
public javax.sql.DataSource getDatasource() {
- return datasource;
+ return datasource;
}
- public void remove(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws
JMSException {
- PreparedStatement pstmt= null;
- Connection con= null;
- try {
- con= datasource.getConnection();
- pstmt= con.prepareStatement("delete from jms_messages where messageid
= ? and destination = ?");
- String hexString= null;
- if (message.header.messageId <= 0)
- hexString= "-" + Long.toHexString((-1) *
message.header.messageId);
- else
- hexString= Long.toHexString(message.header.messageId);
- pstmt.setString(1, hexString);
- pstmt.setString(2, ((SpyDestination)
message.getJMSDestination()).getName().trim());
-
- pstmt.execute();
- } catch (SQLException e) {
- throwJMSException("Could not remove the message.", e);
- } finally {
- try {
- if (pstmt != null)
- pstmt.close();
- if (con != null)
- con.close();
- } catch (SQLException e) {
- throwJMSException("Could not close the database.", e);
- }
+ public void remove(SpyMessage message, org.jboss.mq.pm.Tx transactionId)
+ throws JMSException
+ {
+ PreparedStatement pstmt= null;
+ Connection con= null;
+ try
+ {
+ con= datasource.getConnection();
+ pstmt= con.prepareStatement("delete from jms_messages where messageid = ?
and destination = ?");
+ String hexString= null;
+ if (message.header.messageId <= 0)
+ {
+ hexString= "-" + Long.toHexString((-1) * message.header.messageId);
+ }
+ else
+ {
+ hexString= Long.toHexString(message.header.messageId);
+ }
+ pstmt.setString(1, hexString);
+ pstmt.setString(2, ((SpyDestination)
message.getJMSDestination()).getName().trim());
+
+ pstmt.execute();
+ }
+ catch (SQLException e)
+ {
+ throwJMSException("Could not remove the message.", e);
+ }
+ finally
+ {
+ try
+ {
+ if (pstmt != null)
+ pstmt.close();
+ if (con != null)
+ con.close();
+ }
+ catch (SQLException e)
+ {
+ throwJMSException("Could not close the database.", e);
+ }
- }
+ }
}
- public void setDatasource(javax.sql.DataSource newDatasource) {
- datasource= newDatasource;
- }
-}
\ No newline at end of file
+}
1.9 +225 -173 jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- PersistenceManager.java 2001/10/28 01:27:00 1.8
+++ PersistenceManager.java 2001/11/10 21:38:05 1.9
@@ -6,34 +6,37 @@
*/
package org.jboss.mq.pm.jdbc;
-import javax.rmi.PortableRemoteObject;
-import javax.jms.JMSException;
-import javax.sql.*;
-import javax.naming.*;
-import javax.management.*;
-import javax.naming.InitialContext;
+
+
+
+import java.io.*;
import java.net.URL;
+import java.sql.*;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
-import java.sql.*;
-import java.io.*;
-
-
-import org.jboss.mq.SpyDestination;
-import org.jboss.mq.xml.XElement;
+import java.util.TreeSet;
+import javax.jms.JMSException;
+import javax.management.*;
+import javax.naming.*;
+import javax.naming.InitialContext;
+import javax.rmi.PortableRemoteObject;
+import javax.sql.*;
import org.jboss.mq.ConnectionToken;
+import org.jboss.mq.SpyDestination;
+import org.jboss.mq.SpyJMSException;
+import org.jboss.mq.SpyMessage;
+import org.jboss.mq.pm.TxManager;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.JMSServer;
-import org.jboss.mq.pm.TxManager;
-import org.jboss.mq.SpyMessage;
-import org.jboss.mq.SpyJMSException;
+import org.jboss.mq.server.MessageCache;
import org.jboss.mq.server.MessageReference;
-
+import org.jboss.mq.xml.XElement;
import org.jboss.system.ServiceMBeanSupport;
+import java.util.Map;
/**
* This class manages all persistence related services for file based
@@ -41,16 +44,33 @@
*
* @author: Jayesh Parayali ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class PersistenceManager extends ServiceMBeanSupport implements
PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager {
+
+
+ private ObjectName messageCacheName;
+ private MessageCache messageCache;
+
+
+
+ private ObjectName dataSourceName;
+ private DataSource datasource;
- protected static DataSource datasource;
+ private String jmsDBPoolName;
+
+ //we only need one- it has no state dependent on destination.
+ private MessageLog messageLog;
+
+ private Map unrestoredMessages;
+
+ private TxManager txManager;
- // Log file used to store commited transactions.
+
+ // Object to handle transaction recording.
TxLog txLog;
// Maps SpyDestinations to SpyMessageLogs
- HashMap messageLogs= new HashMap();
+ //HashMap messageLogs= new HashMap();
// Maps (Long)txIds to LinkedList of AddFile tasks
HashMap transactedTasks= new HashMap();
@@ -60,142 +80,121 @@
txManager = new TxManager(this);
}
- static class LogInfo {
- MessageLog log;
- SpyDestination destination;
-
- LogInfo(MessageLog log, SpyDestination destination) {
- this.log= log;
- this.destination= destination;
- }
+ public org.jboss.mq.pm.PersistenceManager getInstance()
+ {
+ return this;
}
- class Transaction {
- private LogInfo logInfo;
- private SpyMessage message;
- private org.jboss.mq.pm.Tx txId;
- private boolean add;
- public Transaction(boolean add, LogInfo logInfo, SpyMessage message,
org.jboss.mq.pm.Tx txId) {
- this.add= add;
- this.logInfo= logInfo;
- this.message= message;
- this.txId= txId;
- }
- public void commit() throws JMSException {
- if (!add)
- logInfo.log.remove(message, txId);
- }
- public void rollback() throws JMSException {
- if (add)
- logInfo.log.remove(message, txId);
- }
+ public ObjectName getMessageCache()
+ {
+ return messageCacheName;
}
-
- private String jmsDBPoolName;
- TxManager txManager;
- /**
- * Insert the method's description here.
- * Creation date: (6/27/2001 1:07:07 AM)
- * @return java.lang.String
- */
- public java.lang.String getJmsDBPoolName() {
- return jmsDBPoolName;
+ public void setMessageCache(ObjectName messageCache)
+ {
+ this.messageCacheName = messageCache;
}
- public String getName() {
- return "JBossMQ-PersistenceManager";
+ public MessageCache getMessageCacheInstance()
+ {
+ return messageCache;
}
- public void initService() throws Exception {
-
- JMSServer server= (JMSServer) getServer().invoke(new
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {
- }, new String[] {
- });
- server.setPersistenceManager(this);
+ public ObjectName getDataSource()
+ {
+ return dataSourceName;
}
- /**
- * Insert the method's description here.
- * Creation date: (6/27/2001 1:07:07 AM)
- * @param newJmsDBPoolName java.lang.String
- */
- public void setJmsDBPoolName(java.lang.String newJmsDBPoolName) {
- jmsDBPoolName= newJmsDBPoolName;
- }
-
- public void startService() throws Exception {
-
- //Get an InitialContext
- InitialContext ctx= new InitialContext();
- datasource= (DataSource) ctx.lookup(jmsDBPoolName);
- txLog= new TxLog(datasource);
-
- Iterator i= messageLogs.values().iterator();
- while (i.hasNext()) {
- LogInfo li= (LogInfo) i.next();
- li.log.setDatasource(datasource);
- }
-
- JMSServer server= (JMSServer) getServer().invoke(new
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {
- }, new String[] {
- });
- restore(server);
-
+ public void setDataSource(ObjectName dataSourceName)
+ {
+ this.dataSourceName = dataSourceName;
}
- public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException {
+ public String getName() {
+ return "JBossMQ-jdbc-PersistenceManager";
+ }
- try {
- //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
- //java.io.File file = new java.io.File(logDir.getFile());
- LogInfo logInfo;
- synchronized (messageLogs) {
- logInfo= (LogInfo) messageLogs.remove("" + dest);
- }
- if (logInfo == null)
- throw new SpyJMSException("The persistence log was never
initialized");
+ public void startService() throws Exception
+ {
+ //Find the ConnectionFactoryLoader MBean so we can find the datasource
+ String dsName = (String)getServer().getAttribute(dataSourceName, "JndiName");
+ //Get an InitialContext
- logInfo.log.close();
- //file.delete();
+ InitialContext ctx= new InitialContext();
+ datasource= (DataSource) ctx.lookup("java:/" + dsName);
+ txLog= new TxLog(datasource);
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE= new javax.jms.JMSException("Invalid
configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
+ messageCache = (MessageCache)getServer().invoke(messageCacheName,
"getInstance", new Object[] {}, new String[] {});
+ messageLog = new MessageLog(messageCache, datasource);
+ restoreTransactions();
}
- public void initQueue(SpyDestination dest) throws javax.jms.JMSException {
- try {
-
- MessageLog log= new MessageLog(datasource, dest.toString());
- LogInfo info= new LogInfo(log, dest);
-
- synchronized (messageLogs) {
- messageLogs.put("" + dest, info);
- }
+ private void restoreTransactions() throws JMSException
+ {
+ Collection lostTx = txLog.restore();
+ if (!lostTx.isEmpty())
+ {
+ log.error("Unrecoverable transactions found in jdbc persistence manager!
Your data is corrupt!");
+ } // end of if ()
+
+ unrestoredMessages = messageLog.restoreAll();
+ }
- } catch (javax.jms.JMSException e) {
- throw e;
- } catch (Exception e) {
- javax.jms.JMSException newE= new javax.jms.JMSException("Invalid
configuration.");
- newE.setLinkedException(e);
- throw newE;
- }
+ public void restoreQueue(JMSDestination jmsDest, SpyDestination dest)
+ throws javax.jms.JMSException
+ {
+ if (jmsDest == null)
+ {
+ throw new IllegalArgumentException("Must supply non null JMSDestination to
restoreQueue");
+ } // end of if ()
+ if (dest == null)
+ {
+ throw new IllegalArgumentException("Must supply non null SpyDestination to
restoreQueue");
+ } // end of if ()
+ Map messages = (Map)unrestoredMessages.get(dest.getName());
+ if (messages != null)
+ {
+ for (Iterator i = messages.values().iterator(); i.hasNext();)
+ {
+ jmsDest.restoreMessage((MessageReference)i.next());
+ } // end of for ()
+
+ } // end of if ()
+
+ }
+ public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException
+ {/*
+ try {
+ LogInfo logInfo;
+ synchronized (messageLogs) {
+ logInfo= (LogInfo) messageLogs.remove("" + dest);
+ }
+ if (logInfo == null)
+ throw new SpyJMSException("The persistence log was never initialized");
+
+ logInfo.log.close();
+ //file.delete();
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE= new javax.jms.JMSException("Invalid
configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+ */
}
+
public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException {
- LogInfo logInfo;
+ // LogInfo logInfo;
SpyMessage message = messageRef.getMessage();
-
+ /*
synchronized (messageLogs) {
logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
}
@@ -204,6 +203,9 @@
throw new javax.jms.JMSException("Destination was not initalized with
the PersistenceManager");
logInfo.log.add(message, txId);
+ */
+ //messageLog will figure out what destination to use.
+ messageLog.add(message,txId);
if (txId != null) {
LinkedList tasks;
@@ -213,7 +215,7 @@
if (tasks == null)
throw new javax.jms.JMSException("Transaction is not active
5.");
synchronized (tasks) {
- tasks.addLast(new Transaction(true, logInfo, message, txId));
+ tasks.addLast(new Transaction(true, message, txId));
}
}
@@ -221,63 +223,69 @@
public void commitPersistentTx(org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException {
- LinkedList transacted;
- synchronized (transactedTasks) {
- transacted= (LinkedList) transactedTasks.remove(txId);
- }
- synchronized (transacted) {
- Iterator iter= transacted.iterator();
- while (iter.hasNext()) {
- Transaction task= (Transaction) iter.next();
- task.commit();
- }
- }
+ LinkedList transacted;
+ synchronized (transactedTasks) {
+ transacted= (LinkedList) transactedTasks.remove(txId);
+ }
+ synchronized (transacted) {
+ Iterator iter= transacted.iterator();
+ while (iter.hasNext()) {
+ Transaction task= (Transaction) iter.next();
+ task.commit();
+ }
+ }
- txLog.commitTx(txId);
+ txLog.commitTx(txId);
}
public org.jboss.mq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
- org.jboss.mq.pm.Tx txId= txLog.createTx();
- synchronized (transactedTasks) {
- transactedTasks.put(txId, new LinkedList());
- }
- return txId;
+ org.jboss.mq.pm.Tx txId= txLog.createTx();
+ synchronized (transactedTasks) {
+ transactedTasks.put(txId, new LinkedList());
+ }
+ return txId;
}
/**
- * getTxManager method comment.
- */
+ * getTxManager method comment.
+ */
public org.jboss.mq.pm.TxManager getTxManager() {
- return txManager;
+ return txManager;
}
+
public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException {
- LogInfo logInfo;
+ // LogInfo logInfo;
SpyMessage message = messageRef.getMessage();
-
+ /*
synchronized (messageLogs) {
logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
}
if (logInfo == null)
throw new javax.jms.JMSException("Destination was not initalized with
the PersistenceManager");
-
+ */
if (txId == null)
- logInfo.log.remove(message, txId);
- else {
+ {
+ //logInfo.log.remove(message, txId);
+ messageLog.remove(message, txId);
+ }
+ else
+ {
LinkedList tasks;
synchronized (transactedTasks) {
- tasks= (LinkedList) transactedTasks.get(txId);
+ tasks= (LinkedList) transactedTasks.get(txId);
}
if (tasks == null)
throw new javax.jms.JMSException("Transaction is not active
6.");
synchronized (tasks) {
- tasks.addLast(new Transaction(false, logInfo, message, txId));
+ tasks.addLast(new Transaction(false, message, txId));
}
}
}
-
+ //not sure this one is used.
+/*
public void restore(org.jboss.mq.server.JMSServer server) throws
javax.jms.JMSException {
TreeSet committingTXs= txLog.restore();
@@ -306,21 +314,65 @@
}
}
-
+*/
public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException {
- LinkedList transacted;
- synchronized (transactedTasks) {
- transacted= (LinkedList) transactedTasks.remove(txId);
- }
- synchronized (transacted) {
- Iterator iter= transacted.iterator();
- while (iter.hasNext()) {
- Transaction task= (Transaction) iter.next();
- task.rollback();
- }
- }
+ LinkedList transacted;
+ synchronized (transactedTasks) {
+ transacted= (LinkedList) transactedTasks.remove(txId);
+ }
+ synchronized (transacted) {
+ Iterator iter= transacted.iterator();
+ while (iter.hasNext()) {
+ Transaction task= (Transaction) iter.next();
+ task.rollback();
+ }
+ }
+
+ txLog.rollbackTx(txId);
+ }
- txLog.rollbackTx(txId);
+
+ /*static class LogInfo {
+ MessageLog log;
+ SpyDestination destination;
+
+ LogInfo(MessageLog log, SpyDestination destination) {
+ this.log= log;
+ this.destination= destination;
+ }
+ }
+ */
+ class Transaction {
+ //private LogInfo logInfo;
+ private SpyMessage message;
+ private org.jboss.mq.pm.Tx txId;
+ private boolean add;
+ public Transaction(boolean add, SpyMessage message, org.jboss.mq.pm.Tx txId) {
+ this.add= add;
+ //this.logInfo= logInfo;
+ this.message= message;
+ this.txId= txId;
+ }
+ public void commit() throws JMSException
+ {
+ if (!add)
+ {
+ messageLog.remove(message, txId);
+ }
+ }
+ public void rollback() throws JMSException
+ {
+ if (add)
+ {
+ messageLog.remove(message, txId);
+ }
+ }
}
-}
\ No newline at end of file
+
+
+
+}
+
+
+
1.5 +10 -16
jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManagerMBean.java
Index: PersistenceManagerMBean.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManagerMBean.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- PersistenceManagerMBean.java 2001/09/01 03:01:00 1.4
+++ PersistenceManagerMBean.java 2001/11/10 21:38:05 1.5
@@ -6,6 +6,7 @@
*/
package org.jboss.mq.pm.jdbc;
+import javax.management.ObjectName;
import org.jboss.system.ServiceMBean;
/**
@@ -13,24 +14,17 @@
*
* @author Vincent Sheffer ([EMAIL PROTECTED])
* @see <related>
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
-public interface PersistenceManagerMBean extends ServiceMBean
+public interface PersistenceManagerMBean
+ extends ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean
{
- public final static String OBJECT_NAME = ":service=JBossMQ";
+ public final static String OBJECT_NAME = "JBOSSMQ:service=PersistenceManager";
- /**
- * Gets the JmsDBPoolName attribute of the PersistenceManagerMBean object
- *
- * @return The JmsDBPoolName value
- */
- public java.lang.String getJmsDBPoolName();
-
- /**
- * Sets the JmsDBPoolName attribute of the PersistenceManagerMBean object
- *
- * @param newJmsDBPoolName The new JmsDBPoolName value
- */
- public void setJmsDBPoolName(java.lang.String newJmsDBPoolName);
+ ObjectName getDataSource();
+
+ void setDataSource(ObjectName dataSource);
+
+
}
1.3 +73 -31 jbossmq/src/main/org/jboss/mq/pm/jdbc/TxLog.java
Index: TxLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/TxLog.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- TxLog.java 2001/08/17 03:04:05 1.2
+++ TxLog.java 2001/11/10 21:38:05 1.3
@@ -25,7 +25,7 @@
*
* @created August 16, 2001
* @author: Jayesh Parayali ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class TxLog {
@@ -33,52 +33,94 @@
// Attributes
/////////////////////////////////////////////////////////////////////
protected long nextTransactionId = Long.MIN_VALUE;
- protected static DataSource ds = null;
+ private final DataSource ds;
/////////////////////////////////////////////////////////////////////
// Constructors
/////////////////////////////////////////////////////////////////////
- public TxLog( DataSource datasource )
- throws JMSException {
- if ( ds == null ) {
- ds = datasource;
+ public TxLog( DataSource ds ) throws JMSException
+ {
+ if (ds == null)
+ {
+ throw new IllegalArgumentException("Must supply a datasource to construct
a TxLog");
+ }
+ this.ds = ds;
+ try
+ {
+ Connection c = ds.getConnection();
+ try
+ {
+ ResultSet rs = c.getMetaData().getTables(null, null,
"jms_transactions", null);
+ if (!rs.next())
+ {
+ Statement s = c.createStatement();
+ try
+ {
+ s.executeUpdate("create table jms_transactions (id varchar(32)
primary key)");
+ }
+ finally
+ {
+ s.close();
+ } // end of try-catch
+
+ } // end of if ()
+ rs.close();
+ }
+ finally
+ {
+ c.close();
+ } // end of try-catch
}
+ catch (SQLException e)
+ {
+ throwJMSException("could not find or set up transaction table", e);
+ } // end of try-catch
+
}
- public synchronized java.util.TreeSet restore()
+ public synchronized TreeSet restore()
throws JMSException {
TreeSet items = new TreeSet();
- ;
Connection con = null;
PreparedStatement stmt = null;
ResultSet rs = null;
- try {
- con = getConnection();
- stmt = con.prepareStatement( "select id from jms_transactions" );
- rs = stmt.executeQuery();
- while ( rs.next() ) {
- long id = Long.parseLong( rs.getString( 1 ).trim(), 16 );
- items.add( new Long( id ) );
- }
- } catch ( SQLException e ) {
- throwJMSException( "Could not write transaction log on commit.", e );
- } finally {
- try {
- if ( rs != null ) {
- rs.close();
+ try
+ {
+ try
+ {
+ con = getConnection();
+ try
+ {
+ stmt = con.prepareStatement( "select id from jms_transactions" );
+ try
+ {
+ rs = stmt.executeQuery();
+ while ( rs.next() )
+ {
+ long id = Long.parseLong( rs.getString( 1 ).trim(), 16 );
+ items.add( new Long( id ) );
+ }
+ }
+ finally
+ {
+ rs.close();
+ } // end of finally
}
- if ( stmt != null ) {
- stmt.close();
- }
- if ( con != null ) {
- con.close();
- }
- } catch ( SQLException e ) {
- throwJMSException( "Could not close database connection in transaction
log (restore)", e );
+ finally
+ {
+ stmt.close();
+ } // end of finally
}
+ finally
+ {
+ con.close();
+ } // end of finally
}
-
+ catch ( SQLException e )
+ {
+ throwJMSException( "Could not write transaction log on commit.", e );
+ }
return items;
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development