User: chirino 
  Date: 01/07/27 17:33:38

  Modified:    src/main/org/jbossmq/pm/jdbc MessageLog.java
                        PersistenceManager.java
                        PersistenceManagerMBean.java TxLog.java
  Log:
  Once again many changes.
  - The logic that handled the processing of queue and topic messages
   was seperated our more to make it easier to follow.
  - A QueuedTask class was created to avoid unneeded processing of queues.
  - The interface between the client-server-queues-peristence manager to handel
   DurableSubscription was too verbose, created a DurableSubscripton class and now
   SpyTopics can be inspected to see if they are being used as a DurableSubscription
  - The MBeans that add queues and topics makes it simpler to configure a queue/topic.
  
  Revision  Changes    Path
  1.2       +13 -12    jbossmq/src/main/org/jbossmq/pm/jdbc/MessageLog.java
  
  Index: MessageLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/MessageLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MessageLog.java   2001/07/11 02:52:16     1.1
  +++ MessageLog.java   2001/07/28 00:33:38     1.2
  @@ -27,7 +27,7 @@
    * queue in case of provider failure.
    *
    * @author: Jayesh Parayali ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class MessageLog {
   
  @@ -37,20 +37,14 @@
     //private File queueName;
     protected static DataSource datasource;
   
  -  /////////////////////////////////////////////////////////////////////
  -  // Constructor
  -  /////////////////////////////////////////////////////////////////////
  -  public MessageLog(DataSource datasource, String dest, String queueId) throws 
JMSException {
  -        if (this.datasource == null)
  -              this.datasource = datasource;
  -  }    
  +    
   
   
     /////////////////////////////////////////////////////////////////////
     // Public Methods
     /////////////////////////////////////////////////////////////////////
     public void close() throws JMSException {
  -  }    
  +  }      
   
     public void add( SpyMessage message, Long transactionId ) throws JMSException {
        PreparedStatement pstmt = null;
  @@ -95,7 +89,7 @@
          }
   
        }
  -  }    
  +  }      
   
     public void remove( SpyMessage message, Long transactionId ) throws JMSException {
        PreparedStatement pstmt = null;
  @@ -128,7 +122,7 @@
          }
   
        }
  -  }    
  +  }      
   
     public SpyMessage[] restore(java.util.TreeSet comittingTXs, String dest) throws 
JMSException {
        String destin = dest.substring(21,dest.length());
  @@ -188,12 +182,19 @@
        for( int i=0; iter.hasNext(); i++ )
          rc[i] = (SpyMessage)iter.next();
        return rc;
  -  }    
  +  }        
   
     private void throwJMSException(String message, Exception e) throws JMSException {
        JMSException newE = new JMSException(message);
        newE.setLinkedException(e);
        throw newE;
  -  }    
  +  }      
   
  +  /////////////////////////////////////////////////////////////////////
  +  // Constructor
  +  /////////////////////////////////////////////////////////////////////
  +  public MessageLog(DataSource datasource, String dest) throws JMSException {
  +        if (this.datasource == null)
  +              this.datasource = datasource;
  +  }    
   }
  
  
  
  1.2       +111 -105  jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- PersistenceManager.java   2001/07/11 02:52:16     1.1
  +++ PersistenceManager.java   2001/07/28 00:33:38     1.2
  @@ -40,7 +40,7 @@
    *
    * @author: Jayesh Parayali ([EMAIL PROTECTED])
    *
  - *  @version $Revision: 1.1 $
  + *  @version $Revision: 1.2 $
    */
   public class PersistenceManager extends ServiceMBeanSupport  
        implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, 
MBeanRegistration {
  @@ -62,12 +62,10 @@
     static class LogInfo {
        MessageLog log;
        SpyDestination destination;
  -     String queueId;
   
  -     LogInfo(MessageLog log, SpyDestination destination, String queueId) {
  +     LogInfo(MessageLog log, SpyDestination destination) {
          this.log=log;
          this.destination=destination;
  -       this.queueId=queueId;
        }
     }
   
  @@ -81,7 +79,7 @@
          transactedTasks.put(txId, new LinkedList());
        }
        return txId;
  -  }    
  +  }      
   
     public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
   
  @@ -98,7 +96,7 @@
        }
   
        txLog.commitTx(txId);
  -  }    
  +  }      
   
     public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
   
  @@ -115,108 +113,15 @@
        }
   
        txLog.rollbackTx(txId);
  -  }    
  -
  -  public void initQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  -     try {
  -       //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  -
  -       MessageLog log = new MessageLog(datasource, dest.toString(), queueId);
  -
  -       LogInfo info = new LogInfo(log, dest, queueId);
  -
  -       synchronized(messageLogs){
  -             messageLogs.put(""+dest+"-"+queueId, info);
  -       }
  -
  -     } catch (javax.jms.JMSException e) {
  -       throw e;
  -     } catch (Exception e) {
  -       javax.jms.JMSException newE = new javax.jms.JMSException("Invalid 
configuration.");
  -       newE.setLinkedException(e);
  -       throw newE;
  -     }
  -
  -  }    
  -
  -  public void destroyQueue( SpyDestination dest, String queueId ) throws 
javax.jms.JMSException {
  -
  -     try {
  -
  -       //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  -       //java.io.File file = new java.io.File(logDir.getFile());
  -
  -       LogInfo logInfo;
  -       synchronized(messageLogs){
  -             logInfo = (LogInfo)messageLogs.remove(""+dest+"-"+queueId);
  -       }
  -       if( logInfo == null )
  -             throw new JMSException("The persistence log was never initialized");
  -
  -       logInfo.log.close();
  -       //file.delete();
  -
  -     } catch (javax.jms.JMSException e) {
  -       throw e;
  -     } catch (Exception e) {
  -       javax.jms.JMSException newE = new javax.jms.JMSException("Invalid 
configuration.");
  -       newE.setLinkedException(e);
  -       throw newE;
  -     }
  -
  -  }    
  -
  -  public void add(String queueId, org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  -     LogInfo logInfo;
  -
  -     synchronized (messageLogs) {
  -       logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
  -     }
  -
  -     if (logInfo == null)
  -       throw new javax.jms.JMSException("Destination was not initalized with the 
PersistenceManager");
  -
  -     logInfo.log.add(message, txId);
  -
  -     if( txId != null ) {
  -       LinkedList tasks;
  -       synchronized( transactedTasks ) {
  -             tasks = (LinkedList)transactedTasks.get(txId);
  -       }
  -       if( tasks == null )
  -             throw new javax.jms.JMSException("Transaction is not active 5.");
  -       synchronized(tasks){
  -             tasks.addLast(new Transaction(true, logInfo, message, txId));
  -       }
  -     }
  -
  -  }    
  -
  -  public void remove(String queueId, org.jbossmq.SpyMessage message, Long txId) 
throws javax.jms.JMSException {
  -     LogInfo logInfo;
  +  }      
   
  -     synchronized (messageLogs) {
  -       logInfo = (LogInfo) 
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
  -     }
  +    
   
  -     if (logInfo == null)
  -       throw new javax.jms.JMSException("Destination was not initalized with the 
PersistenceManager");
  +    
   
  -     if( txId == null )
  -       logInfo.log.remove(message, txId);
  -     else {
  -       LinkedList tasks;
  -       synchronized (transactedTasks) {
  -             tasks = (LinkedList)transactedTasks.get(txId);
  -       }
  -       if( tasks == null )
  -             throw new javax.jms.JMSException("Transaction is not active 6.");
  -       synchronized(tasks){
  -             tasks.addLast(new Transaction(false, logInfo, message, txId));
  -       }
  -     }
  +    
   
  -  }    
  +    
   
     class Transaction {
        private LogInfo logInfo;
  @@ -294,12 +199,12 @@
          //TODO: make sure this lock is good enough
          synchronized (q) {
                for (int i = 0; i < rebuild.length; i++) {
  -               q.restoreMessage(rebuild[i], logInfo.queueId);
  +               q.restoreMessage(rebuild[i]);
                }
          }
        }
   
  -  }      
  +  }          
   
   /**
    * Insert the method's description here.
  @@ -316,4 +221,105 @@
                restore(server);
   
        }
  +
  +  public void add(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  +     LogInfo logInfo;
  +
  +     synchronized (messageLogs) {
  +       logInfo = (LogInfo) messageLogs.get(""+message.getJMSDestination());
  +     }
  +
  +     if (logInfo == null)
  +       throw new javax.jms.JMSException("Destination was not initalized with the 
PersistenceManager");
  +
  +     logInfo.log.add(message, txId);
  +
  +     if( txId != null ) {
  +       LinkedList tasks;
  +       synchronized( transactedTasks ) {
  +             tasks = (LinkedList)transactedTasks.get(txId);
  +       }
  +       if( tasks == null )
  +             throw new javax.jms.JMSException("Transaction is not active 5.");
  +       synchronized(tasks){
  +             tasks.addLast(new Transaction(true, logInfo, message, txId));
  +       }
  +     }
  +
  +  }    
  +
  +  public void destroyQueue( SpyDestination dest ) throws javax.jms.JMSException {
  +
  +     try {
  +
  +       //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  +       //java.io.File file = new java.io.File(logDir.getFile());
  +
  +       LogInfo logInfo;
  +       synchronized(messageLogs){
  +             logInfo = (LogInfo)messageLogs.remove(""+dest);
  +       }
  +       if( logInfo == null )
  +             throw new JMSException("The persistence log was never initialized");
  +
  +       logInfo.log.close();
  +       //file.delete();
  +
  +     } catch (javax.jms.JMSException e) {
  +       throw e;
  +     } catch (Exception e) {
  +       javax.jms.JMSException newE = new javax.jms.JMSException("Invalid 
configuration.");
  +       newE.setLinkedException(e);
  +       throw newE;
  +     }
  +
  +  }    
  +
  +  public void initQueue( SpyDestination dest) throws javax.jms.JMSException {
  +     try {
  +       //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
  +
  +       MessageLog log = new MessageLog(datasource, dest.toString());
  +
  +       LogInfo info = new LogInfo(log, dest);
  +
  +       synchronized(messageLogs){
  +             messageLogs.put(""+dest, info);
  +       }
  +
  +     } catch (javax.jms.JMSException e) {
  +       throw e;
  +     } catch (Exception e) {
  +       javax.jms.JMSException newE = new javax.jms.JMSException("Invalid 
configuration.");
  +       newE.setLinkedException(e);
  +       throw newE;
  +     }
  +
  +  }        
  +
  +  public void remove(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  +     LogInfo logInfo;
  +
  +     synchronized (messageLogs) {
  +       logInfo = (LogInfo) messageLogs.get(""+message.getJMSDestination());
  +     }
  +
  +     if (logInfo == null)
  +       throw new javax.jms.JMSException("Destination was not initalized with the 
PersistenceManager");
  +
  +     if( txId == null )
  +       logInfo.log.remove(message, txId);
  +     else {
  +       LinkedList tasks;
  +       synchronized (transactedTasks) {
  +             tasks = (LinkedList)transactedTasks.get(txId);
  +       }
  +       if( tasks == null )
  +             throw new javax.jms.JMSException("Transaction is not active 6.");
  +       synchronized(tasks){
  +             tasks.addLast(new Transaction(false, logInfo, message, txId));
  +       }
  +     }
  +
  +  }    
   }
  
  
  
  1.3       +1 -1      
jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManagerMBean.java
  
  Index: PersistenceManagerMBean.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/PersistenceManagerMBean.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- PersistenceManagerMBean.java      2001/07/16 02:51:46     1.2
  +++ PersistenceManagerMBean.java      2001/07/28 00:33:38     1.3
  @@ -41,7 +41,7 @@
    *      
    *   @see <related>
    *   @author Vincent Sheffer ([EMAIL PROTECTED])
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public interface PersistenceManagerMBean
      extends org.jboss.util.ServiceMBean
  @@ -61,4 +61,4 @@
      // Public --------------------------------------------------------
   public java.lang.String getJmsDBPoolName();
   public void setJmsDBPoolName(java.lang.String newJmsDBPoolName);
  -}
  +}
  \ No newline at end of file
  
  
  
  1.2       +7 -7      jbossmq/src/main/org/jbossmq/pm/jdbc/TxLog.java
  
  Index: TxLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/jdbc/TxLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TxLog.java        2001/07/11 02:52:16     1.1
  +++ TxLog.java        2001/07/28 00:33:38     1.2
  @@ -22,7 +22,7 @@
    * It is used to rollback transactions when the system restarts.
    *
    * @author: Jayesh Parayali ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class TxLog {
   
  @@ -38,11 +38,11 @@
     public TxLog(DataSource datasource) throws JMSException {
                  if (ds == null)
                ds = datasource;
  -  }    
  +  }      
   
     private final Connection getConnection() throws SQLException {
        return ds.getConnection();
  -  }    
  +  }      
   
     synchronized public Long createTx() throws JMSException {
        Long id = new Long(nextTransactionId++);
  @@ -74,7 +74,7 @@
                throwJMSException("Could not close database connection in transaction 
log (createTx).",e);
          }
        return id;
  -  }    
  +  }      
   
     synchronized public void commitTx(Long txId) throws JMSException {
        Connection con = null;
  @@ -104,7 +104,7 @@
          catch(SQLException e) {
                throwJMSException("Could not close database connection in transaction 
log (commitTx)",e);
          }
  -  }    
  +  }      
   
     synchronized public void rollbackTx(Long txId) throws JMSException {
        Connection con = null;
  @@ -134,7 +134,7 @@
          catch(SQLException e) {
                throwJMSException("Could not close database connection in transaction 
log (rollbackTx)",e);
          }
  -  }    
  +  }      
   
     synchronized public java.util.TreeSet restore() throws JMSException {
        TreeSet items = new TreeSet();;
  @@ -167,7 +167,7 @@
        }
   
        return items;
  -  }    
  +  }      
   
     /////////////////////////////////////////////////////////////////////
     // Private Methods
  @@ -176,6 +176,6 @@
        JMSException newE = new JMSException(message);
        newE.setLinkedException(e);
        throw newE;
  -  }    
  +  }      
   
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to