User: d_jencks
  Date: 01/11/10 13:38:05

  Modified:    src/main/org/jboss/mq/pm/jdbc MessageLog.java
                        PersistenceManager.java
                        PersistenceManagerMBean.java TxLog.java
  Log:
  Changed mbean dependencies to work directly by mbean-references: eliminated depends 
tag from *service.xml files
  
  Revision  Changes    Path
  1.6       +258 -141  jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java
  
  Index: MessageLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- MessageLog.java   2001/10/28 04:07:34     1.5
  +++ MessageLog.java   2001/11/10 21:38:05     1.6
  @@ -6,184 +6,301 @@
    */
   package org.jboss.mq.pm.jdbc;
   
  -import java.io.IOException;
  -import java.io.Serializable;
  -import java.io.FileOutputStream;
  -import java.io.ObjectOutputStream;
  +
  +
  +import java.io.*;
  +import java.io.File;
   import java.io.FileInputStream;
  +import java.io.FileOutputStream;
  +import java.io.IOException;
   import java.io.ObjectInputStream;
  -import java.io.File;
  +import java.io.ObjectOutputStream;
  +import java.io.Serializable;
   import java.sql.*;
  -import java.io.*;
  -
  +import java.util.HashMap;
  +import java.util.Map;
  +import java.util.Set;
  +import java.util.TreeMap;
   import javax.jms.JMSException;
   import javax.sql.*;
  -
   import org.jboss.mq.SpyDestination;
  -import org.jboss.mq.SpyMessage;
   import org.jboss.mq.SpyJMSException;
  +import org.jboss.mq.SpyMessage;
  +import org.jboss.mq.server.MessageCache;
  +import org.jboss.mq.server.MessageReference;
   
   /**
    * This is used to keep SpyMessages on the disk and is used reconstruct the
    * queue in case of provider failure.
    *
    * @author: Jayesh Parayali ([EMAIL PROTECTED])
  - * @version $Revision: 1.5 $
  + * @version $Revision: 1.6 $
    */
   public class MessageLog {
   
      /////////////////////////////////////////////////////////////////////
      // Attributes
      /////////////////////////////////////////////////////////////////////
  -   //private File queueName;
  -   protected DataSource datasource;
  +   //maybe this will work with hypersonic??
  +   private static boolean SUPPORTS_OBJECTS = true;
  +
  +   private final DataSource datasource;
  +
  +   private final MessageCache messageCache;
  +
  +   MessageLog(MessageCache messageCache, javax.sql.DataSource datasource) 
  +      throws JMSException
  +   {
  +      if (messageCache == null) 
  +      {
  +         throw new IllegalArgumentException("Need a MessageCache to construct a 
MessageLog!");
  +      } // end of if ()
  +      
  +      if (datasource == null) 
  +      {
  +         throw new IllegalArgumentException("Need a datasource to construct a 
MessageLog!");
  +      } // end of if ()
  +      
  +      this.messageCache = messageCache;
  +      this.datasource = datasource;
  +      try 
  +      {
  +         Connection c = datasource.getConnection();
  +         try 
  +         {
  +            ResultSet rs = c.getMetaData().getTables(null, null, "jms_messages", 
null);
  +            if (!rs.next()) 
  +            {
  +               Statement s = c.createStatement();
  +               try 
  +               {
  +                  s.executeUpdate("create table jms_messages (destination 
varchar(32), messageblob object, messageid varchar(32))");
  +                  
  +
  +               }
  +               finally
  +               {
  +                  s.close(); 
  +               } // end of try-catch
  +            } // end of if ()
  +            rs.close();
  +         } 
  +         finally 
  +         {
  +            c.close();
  +         } // end of try-catch
  +      }
  +      catch (SQLException e) 
  +      {
  +         throwJMSException("could not find or set up message table", e);
  +      } // end of try-catch
  +
  +   }
   
      /////////////////////////////////////////////////////////////////////
      // Public Methods
      /////////////////////////////////////////////////////////////////////
      public void close() throws JMSException {
      }
  -
  -   public SpyMessage[] restore(java.util.TreeSet comittingTXs, String dest) throws 
JMSException {
  -       String destin= dest.substring(21, dest.length());
   
  -       java.util.TreeMap messageIndex= new java.util.TreeMap();
  -       PreparedStatement pstmt= null;
  -       ResultSet rs= null;
  -       Connection con= null;
  -
  -       try {
  -              con= datasource.getConnection();
  -              pstmt= con.prepareStatement("select messageblob, messageid from 
jms_messages where destination = ?");
  -              pstmt.setString(1, destin);
  -              rs= pstmt.executeQuery();
  -
  -              while (rs.next()) {
  -                     byte[] st= (byte[]) rs.getObject(1);
  -                     ByteArrayInputStream baip= new ByteArrayInputStream(st);
  -                     ObjectInputStream ois= new ObjectInputStream(baip);
  -                     // re-create the object
  -                     SpyMessage message= (SpyMessage) ois.readObject();
  -
  -                     //Long msgId = new 
Long(Long.parseLong(rs.getString(2).trim(),16));
  -                     //restore the messageId which is not persistent.
  -                     message.header.messageId= 
Long.parseLong(rs.getString(2).trim(), 16);
  -                     Long msgId= new Long(message.header.messageId);
  -                     messageIndex.put(msgId, message);
  -              }
  -       } catch (SQLException e) {
  -              throwJMSException("SQL error while rebuilding the tranaction log.", 
e);
  -       } catch (Exception e) {
  -              throwJMSException("Could not rebuild the queue from the queue's 
tranaction log.", e);
  -       } finally {
  -              try {
  -                     if (rs != null)
  -                        rs.close();
  -                     if (pstmt != null)
  -                        pstmt.close();
  -                     if (con != null)
  -                        con.close();
  -              } catch (SQLException e) {
  -                     throwJMSException("SQL error while closing the database 
connection", e);
  -              }
  -       }
  -
  -       SpyMessage rc[]= new SpyMessage[messageIndex.size()];
  -       java.util.Iterator iter= messageIndex.values().iterator();
  -       for (int i= 0; iter.hasNext(); i++)
  -              rc[i]= (SpyMessage) iter.next();
  -       return rc;
  +   public Map restoreAll() throws JMSException {
  +      //WTF is 21???
  +      //String destin= dest.substring(21, dest.length());
  +
  +      Map unrestoredMessages = new HashMap();
  +
  +      //TreeMap messageIndex= new TreeMap();
  +      PreparedStatement pstmt= null;
  +      ResultSet rs= null;
  +      Connection con= null;
  +
  +      try 
  +      {
  +         try 
  +         {
  +            con= datasource.getConnection();
  +            try 
  +            {
  +    
  +               pstmt= con.prepareStatement("select destination, messageblob, 
messageid from jms_messages");
  +               //pstmt.setString(1, destin);
  +               try 
  +               {
  +                  rs= pstmt.executeQuery();
  +                  while (rs.next()) 
  +                  {
  +                     String dest = rs.getString(1);
  +                     SpyMessage message = null;
  +                     if (SUPPORTS_OBJECTS) 
  +                     {
  +                        message = (SpyMessage)rs.getObject(2);
  +                     } // end of if ()
  +                     else 
  +                     {
  +                        byte[] st= (byte[]) rs.getObject(2);
  +                        ByteArrayInputStream baip= new ByteArrayInputStream(st);
  +                        ObjectInputStream ois= new ObjectInputStream(baip);
  +                        // re-create the object
  +                        message = (SpyMessage) ois.readObject();
  +                     } // end of else
  +                     
  +                     //restore the messageId which is not persistent.
  +                     //ID stored in hexadecimal string!!
  +                     message.header.messageId= 
Long.parseLong(rs.getString(3).trim(), 16);
  +                     Long msgId= new Long(message.header.messageId);
  +                     MessageReference mr = messageCache.add(message);
  +                     Map messageIndex = (Map)unrestoredMessages.get(dest);
  +                     if (messageIndex == null) 
  +                     {
  +                        messageIndex = new TreeMap();
  +                        unrestoredMessages.put(dest, messageIndex);
  +                     } // end of if ()
  +                     
  +                     messageIndex.put(msgId, mr);
  +                  }
  +               }
  +               finally 
  +               {
  +                  rs.close();
  +               } // end of finally
  +            }
  +            finally 
  +            {
  +               pstmt.close();
  +            } // end of finally
  +         }
  +         finally 
  +         {
  +            con.close();
  +         } // end of finally
  +      
  +      }        
  +      catch (SQLException e) 
  +      {
  +         throwJMSException("SQL error while rebuilding the tranaction log.", e);
  +      } 
  +      catch (Exception e) 
  +      {
  +         throwJMSException("Could not rebuild the queue from the queue's tranaction 
log.", e);
  +      }
  +      return unrestoredMessages;
      }
   
      private void throwJMSException(String message, Exception e) throws JMSException {
  -       JMSException newE= new SpyJMSException(message);
  -       newE.setLinkedException(e);
  -       throw newE;
  +      JMSException newE= new SpyJMSException(message);
  +      newE.setLinkedException(e);
  +      throw newE;
      }
   
  -   /////////////////////////////////////////////////////////////////////
  -   // Constructor
  -   /////////////////////////////////////////////////////////////////////
  -   public MessageLog(javax.sql.DataSource datasource, String dest) throws 
JMSException {
  -       this.datasource = datasource;
  -   }
  -
  -   public void add(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws 
JMSException {
  -       PreparedStatement pstmt= null;
  -       Connection con= null;
  -
  -       try {
  -              con= datasource.getConnection();
  -              ByteArrayOutputStream baos= new ByteArrayOutputStream();
  -              ObjectOutputStream oos= new ObjectOutputStream(baos);
  -              oos.writeObject(message);
  -              byte[] messageAsBytes= baos.toByteArray();
  -              pstmt= con.prepareStatement("insert into jms_messages (messageid, 
destination, messageblob) VALUES(?,?,?)");
  -              ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
  -              pstmt.setString(2, ((SpyDestination) 
message.getJMSDestination()).getName());
  -              pstmt.setBinaryStream(3, bais, messageAsBytes.length);
  -              String hexString= null;
  -              if (message.header.messageId <= 0)
  -                     hexString= "-" + Long.toHexString((-1) * 
message.header.messageId);
  -              else
  -                     hexString= Long.toHexString(message.header.messageId);
  -
  -              pstmt.setString(1, hexString);
  -              pstmt.executeUpdate();
  -
  -              pstmt.close();
  -       } catch (IOException e) {
  -              throwJMSException("Could serialize the message.", e);
  -       } catch (SQLException e) {
  -              throwJMSException("Could not write message to the database.", e);
  -       } finally {
  -              try {
  -                     //if (pstmt != null)
  -                     //pstmt.close();
  -                     if (con != null)
  -                        con.close();
  -              } catch (SQLException e) {
  -                     throwJMSException("Could not close the database.", e);
  -              }
  +   public void add(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws 
JMSException 
  +   {
  +      PreparedStatement pstmt= null;
  +      Connection con= null;
  +
  +      try 
  +      {
  +         con= datasource.getConnection();
  +         pstmt= con.prepareStatement("insert into jms_messages (messageid, 
destination, messageblob) VALUES(?,?,?)");
  +         String hexString= null;
  +         if (message.header.messageId <= 0)
  +         {
  +            hexString= "-" + Long.toHexString((-1) * message.header.messageId);
  +         }
  +         else 
  +         {
  +            hexString= Long.toHexString(message.header.messageId);
  +         } // end of else
  +         pstmt.setString(1, hexString);
  +         pstmt.setString(2, ((SpyDestination) 
message.getJMSDestination()).getName());
  +         if (SUPPORTS_OBJECTS) 
  +         {
  +            pstmt.setObject(3, message);
  +         } // end of if ()
  +         else 
  +         {
  +            ByteArrayOutputStream baos= new ByteArrayOutputStream();
  +            ObjectOutputStream oos= new ObjectOutputStream(baos);
  +            oos.writeObject(message);
  +            byte[] messageAsBytes= baos.toByteArray();
  +            ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
  +            pstmt.setBinaryStream(3, bais, messageAsBytes.length);
  +         } // end of else
  +                 
  +         pstmt.executeUpdate();
  +
  +         pstmt.close();
  +      } 
  +      catch (IOException e) 
  +      {
  +         throwJMSException("Could serialize the message.", e);
  +      } 
  +      catch (SQLException e) 
  +      {
  +         throwJMSException("Could not write message to the database.", e);
  +      } 
  +      finally 
  +      {
  +         try 
  +         {
  +            //if (pstmt != null)
  +            //pstmt.close();
  +            if (con != null)
  +               con.close();
  +         } 
  +         catch (SQLException e) 
  +         {
  +            throwJMSException("Could not close the database.", e);
  +         }
   
  -       }
  +      }
      }
   
      public javax.sql.DataSource getDatasource() {
  -       return datasource;
  +      return datasource;
      }
   
  -   public void remove(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws 
JMSException {
  -       PreparedStatement pstmt= null;
  -       Connection con= null;
  -       try {
  -              con= datasource.getConnection();
  -              pstmt= con.prepareStatement("delete from jms_messages where messageid 
= ? and destination = ?");
  -              String hexString= null;
  -              if (message.header.messageId <= 0)
  -                     hexString= "-" + Long.toHexString((-1) * 
message.header.messageId);
  -              else
  -                     hexString= Long.toHexString(message.header.messageId);
  -              pstmt.setString(1, hexString);
  -              pstmt.setString(2, ((SpyDestination) 
message.getJMSDestination()).getName().trim());
  -
  -              pstmt.execute();
  -       } catch (SQLException e) {
  -              throwJMSException("Could not remove the message.", e);
  -       } finally {
  -              try {
  -                     if (pstmt != null)
  -                        pstmt.close();
  -                     if (con != null)
  -                        con.close();
  -              } catch (SQLException e) {
  -                     throwJMSException("Could not close the database.", e);
  -              }
  +   public void remove(SpyMessage message, org.jboss.mq.pm.Tx transactionId) 
  +      throws JMSException 
  +   {
  +      PreparedStatement pstmt= null;
  +      Connection con= null;
  +      try 
  +      {
  +         con= datasource.getConnection();
  +         pstmt= con.prepareStatement("delete from jms_messages where messageid = ? 
and destination = ?");
  +         String hexString= null;
  +         if (message.header.messageId <= 0)
  +         {
  +            hexString= "-" + Long.toHexString((-1) * message.header.messageId);
  +         }
  +         else
  +         {
  +            hexString= Long.toHexString(message.header.messageId);
  +         }
  +         pstmt.setString(1, hexString);
  +         pstmt.setString(2, ((SpyDestination) 
message.getJMSDestination()).getName().trim());
  +
  +         pstmt.execute();
  +      }
  +      catch (SQLException e) 
  +      {
  +         throwJMSException("Could not remove the message.", e);
  +      } 
  +      finally 
  +      {
  +         try 
  +         {
  +            if (pstmt != null)
  +               pstmt.close();
  +            if (con != null)
  +               con.close();
  +         }
  +         catch (SQLException e) 
  +         {
  +            throwJMSException("Could not close the database.", e);
  +         }
   
  -       }
  +      }
      }
   
  -   public void setDatasource(javax.sql.DataSource newDatasource) {
  -       datasource= newDatasource;
  -   }
  -}
  \ No newline at end of file
  +}
  
  
  
  1.9       +225 -173  jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- PersistenceManager.java   2001/10/28 01:27:00     1.8
  +++ PersistenceManager.java   2001/11/10 21:38:05     1.9
  @@ -6,34 +6,37 @@
    */
   package org.jboss.mq.pm.jdbc;
   
  -import javax.rmi.PortableRemoteObject;
  -import javax.jms.JMSException;
  -import javax.sql.*;
  -import javax.naming.*;
  -import javax.management.*;
  -import javax.naming.InitialContext;
   
  +
  +
  +
  +import java.io.*;
   import java.net.URL;
  +import java.sql.*;
  +import java.util.Collection;
   import java.util.HashMap;
  -import java.util.TreeSet;
   import java.util.Iterator;
   import java.util.LinkedList;
   import java.util.Properties;
  -import java.sql.*;
  -import java.io.*;
  -
  -
  -import org.jboss.mq.SpyDestination;
  -import org.jboss.mq.xml.XElement;
  +import java.util.TreeSet;
  +import javax.jms.JMSException;
  +import javax.management.*;
  +import javax.naming.*;
  +import javax.naming.InitialContext;
  +import javax.rmi.PortableRemoteObject;
  +import javax.sql.*;
   import org.jboss.mq.ConnectionToken;
  +import org.jboss.mq.SpyDestination;
  +import org.jboss.mq.SpyJMSException;
  +import org.jboss.mq.SpyMessage;
  +import org.jboss.mq.pm.TxManager;
   import org.jboss.mq.server.JMSDestination;
   import org.jboss.mq.server.JMSServer;
  -import org.jboss.mq.pm.TxManager;
  -import org.jboss.mq.SpyMessage;
  -import org.jboss.mq.SpyJMSException;
  +import org.jboss.mq.server.MessageCache;
   import org.jboss.mq.server.MessageReference;
  -
  +import org.jboss.mq.xml.XElement;
   import org.jboss.system.ServiceMBeanSupport;
  +import java.util.Map;
   
   /**
    *  This class manages all persistence related services for file based
  @@ -41,16 +44,33 @@
    *
    * @author: Jayesh Parayali ([EMAIL PROTECTED])
    *
  - *  @version $Revision: 1.8 $
  + *  @version $Revision: 1.9 $
    */
   public class PersistenceManager extends ServiceMBeanSupport implements 
PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager {
  +
  +
  +   private ObjectName messageCacheName;
  +   private MessageCache messageCache;
  +
  +
  +
  +   private ObjectName dataSourceName;
  +   private DataSource datasource;
   
  -   protected static DataSource datasource;
  +   private String jmsDBPoolName;
  +
  +   //we only need one- it has no state dependent on destination.
  +   private MessageLog messageLog;
  +
  +   private Map unrestoredMessages;
  +
  +   private TxManager txManager;
   
  -   // Log file used to store commited transactions.
  +
  +   // Object to handle transaction recording.
      TxLog txLog;
      // Maps SpyDestinations to SpyMessageLogs
  -   HashMap messageLogs= new HashMap();
  +   //HashMap messageLogs= new HashMap();
      // Maps (Long)txIds to LinkedList of AddFile tasks
      HashMap transactedTasks= new HashMap();
   
  @@ -60,142 +80,121 @@
         txManager = new TxManager(this);
      }
   
  -   static class LogInfo {
  -       MessageLog log;
  -       SpyDestination destination;
  -
  -       LogInfo(MessageLog log, SpyDestination destination) {
  -              this.log= log;
  -              this.destination= destination;
  -       }
  +   public org.jboss.mq.pm.PersistenceManager getInstance()
  +   {
  +      return this;
      }
   
  -   class Transaction {
  -       private LogInfo logInfo;
  -       private SpyMessage message;
  -       private org.jboss.mq.pm.Tx txId;
  -       private boolean add;
  -       public Transaction(boolean add, LogInfo logInfo, SpyMessage message, 
org.jboss.mq.pm.Tx txId) {
  -              this.add= add;
  -              this.logInfo= logInfo;
  -              this.message= message;
  -              this.txId= txId;
  -       }
  -       public void commit() throws JMSException {
  -              if (!add)
  -                     logInfo.log.remove(message, txId);
  -       }
  -       public void rollback() throws JMSException {
  -              if (add)
  -                     logInfo.log.remove(message, txId);
  -       }
  +   public ObjectName getMessageCache()
  +   {
  +      return messageCacheName;
      }
  -
  -   private String jmsDBPoolName;
  -   TxManager txManager;
   
  -   /**
  -     * Insert the method's description here.
  -     * Creation date: (6/27/2001 1:07:07 AM)
  -     * @return java.lang.String
  -     */
  -   public java.lang.String getJmsDBPoolName() {
  -       return jmsDBPoolName;
  +   public void setMessageCache(ObjectName messageCache)
  +   {
  +      this.messageCacheName = messageCache;
      }
   
  -   public String getName() {
  -       return "JBossMQ-PersistenceManager";
  +   public MessageCache getMessageCacheInstance()
  +   {
  +      return messageCache;
      }
   
  -   public void initService() throws Exception {
  -
  -       JMSServer server= (JMSServer) getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {
  -       }, new String[] {
  -       });
  -       server.setPersistenceManager(this);
   
  +   public ObjectName getDataSource()
  +   {
  +      return dataSourceName;
      }
   
  -   /**
  -     * Insert the method's description here.
  -     * Creation date: (6/27/2001 1:07:07 AM)
  -     * @param newJmsDBPoolName java.lang.String
  -     */
  -   public void setJmsDBPoolName(java.lang.String newJmsDBPoolName) {
  -       jmsDBPoolName= newJmsDBPoolName;
  -   }
  -
  -   public void startService() throws Exception {
  -
  -       //Get an InitialContext
  -       InitialContext ctx= new InitialContext();
  -       datasource= (DataSource) ctx.lookup(jmsDBPoolName);
  -       txLog= new TxLog(datasource);
  -
  -       Iterator i= messageLogs.values().iterator();
  -       while (i.hasNext()) {
  -              LogInfo li= (LogInfo) i.next();
  -              li.log.setDatasource(datasource);
  -       }
  -
  -       JMSServer server= (JMSServer) getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {
  -       }, new String[] {
  -       });
  -       restore(server);
  -
  +   public void setDataSource(ObjectName dataSourceName)
  +   {
  +      this.dataSourceName = dataSourceName;
      }
   
  -   public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException {
  +   public String getName() {
  +      return "JBossMQ-jdbc-PersistenceManager";
  +   }
   
  -       try {
   
  -              //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  -              //java.io.File file = new java.io.File(logDir.getFile());
   
  -              LogInfo logInfo;
  -              synchronized (messageLogs) {
  -                     logInfo= (LogInfo) messageLogs.remove("" + dest);
  -              }
  -              if (logInfo == null)
  -                     throw new SpyJMSException("The persistence log was never 
initialized");
  +   public void startService() throws Exception 
  +   {
  +      //Find the ConnectionFactoryLoader MBean so we can find the datasource
  +      String dsName = (String)getServer().getAttribute(dataSourceName, "JndiName");
  +      //Get an InitialContext
   
  -              logInfo.log.close();
  -              //file.delete();
  +      InitialContext ctx= new InitialContext();
  +      datasource= (DataSource) ctx.lookup("java:/" + dsName);
  +      txLog= new TxLog(datasource);
   
  -       } catch (javax.jms.JMSException e) {
  -              throw e;
  -       } catch (Exception e) {
  -              javax.jms.JMSException newE= new javax.jms.JMSException("Invalid 
configuration.");
  -              newE.setLinkedException(e);
  -              throw newE;
  -       }
  +      messageCache = (MessageCache)getServer().invoke(messageCacheName, 
"getInstance", new Object[] {}, new String[] {});
  +      messageLog = new MessageLog(messageCache, datasource);
  +      restoreTransactions();
   
      }
   
  -   public void initQueue(SpyDestination dest) throws javax.jms.JMSException {
  -       try {
  -
  -              MessageLog log= new MessageLog(datasource, dest.toString());
  -              LogInfo info= new LogInfo(log, dest);
  -
  -              synchronized (messageLogs) {
  -                     messageLogs.put("" + dest, info);
  -              }
  +   private void restoreTransactions() throws JMSException
  +   {
  +      Collection lostTx = txLog.restore();
  +      if (!lostTx.isEmpty()) 
  +      {
  +         log.error("Unrecoverable transactions found in jdbc persistence manager! 
Your data is corrupt!");         
  +      } // end of if ()
  +      
  +      unrestoredMessages = messageLog.restoreAll();
  +   }
   
  -       } catch (javax.jms.JMSException e) {
  -              throw e;
  -       } catch (Exception e) {
  -              javax.jms.JMSException newE= new javax.jms.JMSException("Invalid 
configuration.");
  -              newE.setLinkedException(e);
  -              throw newE;
  -       }
  +   public void restoreQueue(JMSDestination jmsDest, SpyDestination dest)
  +      throws javax.jms.JMSException
  +   {
  +      if (jmsDest == null) 
  +      {
  +         throw new IllegalArgumentException("Must supply non null JMSDestination to 
restoreQueue");
  +      } // end of if ()
  +      if (dest == null) 
  +      {
  +         throw new IllegalArgumentException("Must supply non null SpyDestination to 
restoreQueue");
  +      } // end of if ()
  +      Map messages = (Map)unrestoredMessages.get(dest.getName());
  +      if (messages != null) 
  +      {
  +         for (Iterator i = messages.values().iterator(); i.hasNext();) 
  +         {
  +            jmsDest.restoreMessage((MessageReference)i.next());
  +         } // end of for ()
  +         
  +      } // end of if ()
  +      
  +   }
   
  +   public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException 
  +   {/*
  +      try {
  +         LogInfo logInfo;
  +         synchronized (messageLogs) {
  +            logInfo= (LogInfo) messageLogs.remove("" + dest);
  +         }
  +         if (logInfo == null)
  +            throw new SpyJMSException("The persistence log was never initialized");
  +
  +         logInfo.log.close();
  +         //file.delete();
  +
  +      } catch (javax.jms.JMSException e) {
  +         throw e;
  +      } catch (Exception e) {
  +         javax.jms.JMSException newE= new javax.jms.JMSException("Invalid 
configuration.");
  +         newE.setLinkedException(e);
  +         throw newE;
  +      }
  +    */
      }
   
  +
      public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException {
  -       LogInfo logInfo;
  +      // LogInfo logInfo;
         SpyMessage message = messageRef.getMessage();
  -      
  +      /*
          synchronized (messageLogs) {
                 logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
          }
  @@ -204,6 +203,9 @@
                 throw new javax.jms.JMSException("Destination was not initalized with 
the PersistenceManager");
   
          logInfo.log.add(message, txId);
  +      */
  +      //messageLog will figure out what destination to use.
  +      messageLog.add(message,txId);
   
          if (txId != null) {
                 LinkedList tasks;
  @@ -213,7 +215,7 @@
                 if (tasks == null)
                        throw new javax.jms.JMSException("Transaction is not active 
5.");
                 synchronized (tasks) {
  -                     tasks.addLast(new Transaction(true, logInfo, message, txId));
  +                     tasks.addLast(new Transaction(true, message, txId));
                 }
          }
   
  @@ -221,63 +223,69 @@
   
      public void commitPersistentTx(org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException {
   
  -       LinkedList transacted;
  -       synchronized (transactedTasks) {
  -              transacted= (LinkedList) transactedTasks.remove(txId);
  -       }
  -       synchronized (transacted) {
  -              Iterator iter= transacted.iterator();
  -              while (iter.hasNext()) {
  -                     Transaction task= (Transaction) iter.next();
  -                     task.commit();
  -              }
  -       }
  +      LinkedList transacted;
  +      synchronized (transactedTasks) {
  +         transacted= (LinkedList) transactedTasks.remove(txId);
  +      }
  +      synchronized (transacted) {
  +         Iterator iter= transacted.iterator();
  +         while (iter.hasNext()) {
  +            Transaction task= (Transaction) iter.next();
  +            task.commit();
  +         }
  +      }
   
  -       txLog.commitTx(txId);
  +      txLog.commitTx(txId);
      }
   
      public org.jboss.mq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
  -       org.jboss.mq.pm.Tx txId= txLog.createTx();
  -       synchronized (transactedTasks) {
  -              transactedTasks.put(txId, new LinkedList());
  -       }
  -       return txId;
  +      org.jboss.mq.pm.Tx txId= txLog.createTx();
  +      synchronized (transactedTasks) {
  +         transactedTasks.put(txId, new LinkedList());
  +      }
  +      return txId;
      }
   
      /**
  -     * getTxManager method comment.
  -     */
  +    * getTxManager method comment.
  +    */
      public org.jboss.mq.pm.TxManager getTxManager() {
  -       return txManager;
  +      return txManager;
      }
   
  +
      public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException {
  -       LogInfo logInfo;
  +      // LogInfo logInfo;
         SpyMessage message = messageRef.getMessage();
  -
  +      /*
          synchronized (messageLogs) {
                 logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
          }
   
          if (logInfo == null)
                 throw new javax.jms.JMSException("Destination was not initalized with 
the PersistenceManager");
  -
  +      */
          if (txId == null)
  -              logInfo.log.remove(message, txId);
  -       else {
  +          {
  +              //logInfo.log.remove(message, txId);
  +             messageLog.remove(message, txId);
  +          }
  +       else 
  +          {
                 LinkedList tasks;
                 synchronized (transactedTasks) {
  -                     tasks= (LinkedList) transactedTasks.get(txId);
  +                    tasks= (LinkedList) transactedTasks.get(txId);
                 }
                 if (tasks == null)
                        throw new javax.jms.JMSException("Transaction is not active 
6.");
                 synchronized (tasks) {
  -                     tasks.addLast(new Transaction(false, logInfo, message, txId));
  +                     tasks.addLast(new Transaction(false, message, txId));
                 }
          }
   
      }
  -
  +   //not sure this one is used.
  +/*
      public void restore(org.jboss.mq.server.JMSServer server) throws 
javax.jms.JMSException {
   
          TreeSet committingTXs= txLog.restore();
  @@ -306,21 +314,65 @@
          }
   
      }
  -
  +*/
      public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) throws 
javax.jms.JMSException {
   
  -       LinkedList transacted;
  -       synchronized (transactedTasks) {
  -              transacted= (LinkedList) transactedTasks.remove(txId);
  -       }
  -       synchronized (transacted) {
  -              Iterator iter= transacted.iterator();
  -              while (iter.hasNext()) {
  -                     Transaction task= (Transaction) iter.next();
  -                     task.rollback();
  -              }
  -       }
  +      LinkedList transacted;
  +      synchronized (transactedTasks) {
  +         transacted= (LinkedList) transactedTasks.remove(txId);
  +      }
  +      synchronized (transacted) {
  +         Iterator iter= transacted.iterator();
  +         while (iter.hasNext()) {
  +            Transaction task= (Transaction) iter.next();
  +            task.rollback();
  +         }
  +      }
  +
  +      txLog.rollbackTx(txId);
  +   }
   
  -       txLog.rollbackTx(txId);
  +
  +   /*static class LogInfo {
  +      MessageLog log;
  +      SpyDestination destination;
  +
  +      LogInfo(MessageLog log, SpyDestination destination) {
  +         this.log= log;
  +         this.destination= destination;
  +      }
  +   }
  +   */
  +   class Transaction {
  +      //private LogInfo logInfo;
  +      private SpyMessage message;
  +      private org.jboss.mq.pm.Tx txId;
  +      private boolean add;
  +      public Transaction(boolean add, SpyMessage message, org.jboss.mq.pm.Tx txId) {
  +         this.add= add;
  +         //this.logInfo= logInfo;
  +         this.message= message;
  +         this.txId= txId;
  +      }
  +      public void commit() throws JMSException 
  +      {
  +         if (!add)
  +         {
  +            messageLog.remove(message, txId);
  +         }
  +      }
  +      public void rollback() throws JMSException 
  +      {
  +         if (add)
  +         {
  +            messageLog.remove(message, txId);
  +         }
  +      }
      }
  -}
  \ No newline at end of file
  +
  +
  +
  +}
  +
  +
  +
  
  
  
  1.5       +10 -16    
jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManagerMBean.java
  
  Index: PersistenceManagerMBean.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManagerMBean.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- PersistenceManagerMBean.java      2001/09/01 03:01:00     1.4
  +++ PersistenceManagerMBean.java      2001/11/10 21:38:05     1.5
  @@ -6,6 +6,7 @@
    */
   package org.jboss.mq.pm.jdbc;
   
  +import javax.management.ObjectName;
   import org.jboss.system.ServiceMBean;
   
   /**
  @@ -13,24 +14,17 @@
    *
    * @author     Vincent Sheffer ([EMAIL PROTECTED])
    * @see        <related>
  - * @version    $Revision: 1.4 $
  + * @version    $Revision: 1.5 $
    */
  -public interface PersistenceManagerMBean extends ServiceMBean
  +public interface PersistenceManagerMBean 
  +   extends ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean
   {
   
  -   public final static String OBJECT_NAME = ":service=JBossMQ";
  +   public final static String OBJECT_NAME = "JBOSSMQ:service=PersistenceManager";
   
  -   /**
  -    *  Gets the JmsDBPoolName attribute of the PersistenceManagerMBean object
  -    *
  -    * @return    The JmsDBPoolName value
  -    */
  -   public java.lang.String getJmsDBPoolName();
  -
  -   /**
  -    *  Sets the JmsDBPoolName attribute of the PersistenceManagerMBean object
  -    *
  -    * @param  newJmsDBPoolName  The new JmsDBPoolName value
  -    */
  -   public void setJmsDBPoolName(java.lang.String newJmsDBPoolName);
  +   ObjectName getDataSource();
  +
  +   void setDataSource(ObjectName dataSource);  
  +
  +
   }
  
  
  
  1.3       +73 -31    jbossmq/src/main/org/jboss/mq/pm/jdbc/TxLog.java
  
  Index: TxLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/TxLog.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- TxLog.java        2001/08/17 03:04:05     1.2
  +++ TxLog.java        2001/11/10 21:38:05     1.3
  @@ -25,7 +25,7 @@
    *
    * @created    August 16, 2001
    * @author:    Jayesh Parayali ([EMAIL PROTECTED])
  - * @version    $Revision: 1.2 $
  + * @version    $Revision: 1.3 $
    */
   public class TxLog {
   
  @@ -33,52 +33,94 @@
      // Attributes
      /////////////////////////////////////////////////////////////////////
      protected long   nextTransactionId = Long.MIN_VALUE;
  -   protected static DataSource ds = null;
  +   private final DataSource ds;
   
      /////////////////////////////////////////////////////////////////////
      // Constructors
      /////////////////////////////////////////////////////////////////////
  -   public TxLog( DataSource datasource )
  -      throws JMSException {
  -      if ( ds == null ) {
  -         ds = datasource;
  +   public TxLog( DataSource ds ) throws JMSException
  +   {
  +      if (ds == null) 
  +      {
  +         throw new IllegalArgumentException("Must supply a datasource to construct 
a TxLog");
  +      }
  +      this.ds = ds;
  +      try 
  +      {
  +         Connection c = ds.getConnection();
  +         try 
  +         {
  +            ResultSet rs = c.getMetaData().getTables(null, null, 
"jms_transactions", null);
  +            if (!rs.next()) 
  +            {
  +               Statement s = c.createStatement();
  +               try 
  +               {
  +                  s.executeUpdate("create table jms_transactions (id varchar(32) 
primary key)");
  +               }
  +               finally
  +               {
  +                  s.close(); 
  +               } // end of try-catch
  +
  +            } // end of if ()
  +            rs.close();
  +         } 
  +         finally 
  +         {
  +            c.close();
  +         } // end of try-catch
         }
  +      catch (SQLException e) 
  +      {
  +         throwJMSException("could not find or set up transaction table", e);
  +      } // end of try-catch
  +
      }
   
   
  -   public synchronized java.util.TreeSet restore()
  +   public synchronized TreeSet restore()
         throws JMSException {
         TreeSet items = new TreeSet();
  -      ;
         Connection con = null;
         PreparedStatement stmt = null;
         ResultSet rs = null;
  -      try {
  -         con = getConnection();
  -         stmt = con.prepareStatement( "select  id from jms_transactions" );
  -         rs = stmt.executeQuery();
  -         while ( rs.next() ) {
  -            long id = Long.parseLong( rs.getString( 1 ).trim(), 16 );
  -            items.add( new Long( id ) );
  -         }
  -      } catch ( SQLException e ) {
  -         throwJMSException( "Could not write transaction log on commit.", e );
  -      } finally {
  -         try {
  -            if ( rs != null ) {
  -               rs.close();
  +      try 
  +      {
  +         try 
  +         {
  +            con = getConnection();
  +            try 
  +            {
  +               stmt = con.prepareStatement( "select  id from jms_transactions" );
  +               try 
  +               {
  +                  rs = stmt.executeQuery();
  +                  while ( rs.next() ) 
  +                  {
  +                     long id = Long.parseLong( rs.getString( 1 ).trim(), 16 );
  +                     items.add( new Long( id ) );
  +                  }
  +               }
  +               finally 
  +               {
  +                  rs.close();               
  +               } // end of finally
               }
  -            if ( stmt != null ) {
  -               stmt.close();
  -            }
  -            if ( con != null ) {
  -               con.close();
  -            }
  -         } catch ( SQLException e ) {
  -            throwJMSException( "Could not close database connection in transaction 
log (restore)", e );
  +            finally 
  +            {
  +               stmt.close();            
  +            } // end of finally
            }
  +         finally 
  +         {
  +            con.close();
  +         } // end of finally
         }
  -
  +      catch ( SQLException e ) 
  +      {
  +         throwJMSException( "Could not write transaction log on commit.", e );
  +      }
         return items;
      }
   
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to