User: pkendall
  Date: 01/08/08 18:18:28

  Modified:    src/main/org/jbossmq/pm/logged PersistenceManager.java
                        SpyMessageLog.java SpyMessageLogTester.java
                        SpyTxLog.java
  Log:
  Major updates (especially to topics).
  Speed improvements.
  Make JVM IL work (by using a singleton JMSServer).
  Message Listeners re-implemented using client-side thread.
  
  Revision  Changes    Path
  1.4       +21 -19    jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/PersistenceManager.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- PersistenceManager.java   2001/07/30 03:39:15     1.3
  +++ PersistenceManager.java   2001/08/09 01:18:28     1.4
  @@ -19,8 +19,8 @@
   import org.jbossmq.server.JMSDestination;
   import org.jbossmq.SpyMessage;
   import org.jbossmq.SpyDestination;
  +import org.jbossmq.SpyJMSException;
   
  -
   import javax.naming.InitialContext;
   import org.jbossmq.pm.TxManager;
   import org.jboss.util.ServiceMBeanSupport;
  @@ -32,13 +32,18 @@
    *
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
  -public class PersistenceManager 
  -     extends ServiceMBeanSupport  
  +public class PersistenceManager
  +     extends ServiceMBeanSupport
        implements org.jbossmq.pm.PersistenceManager, PersistenceManagerMBean, 
MBeanRegistration {
  -
   
  +     /**
  +      * NewPersistenceManager constructor.
  +      */
  +     public PersistenceManager() throws javax.jms.JMSException {
  +             txManager = new TxManager( this );
  +     }
   
        private String dataDirectory;
        // Log file used to store commited transactions.
  @@ -58,19 +63,16 @@
        }
   
   
  -     public PersistenceManager() {
  -             txManager = new TxManager(this);
  -     }
   
  -     public Long createPersistentTx() throws javax.jms.JMSException {
  +     public org.jbossmq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
                return spyTxLog.createTx();
        }
   
  -     public void commitPersistentTx(Long txId) throws javax.jms.JMSException {
  +     public void commitPersistentTx(org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
                spyTxLog.commitTx(txId);
        }
   
  -     public void rollbackPersistentTx(Long txId) throws javax.jms.JMSException {
  +     public void rollbackPersistentTx(org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
                spyTxLog.rollbackTx(txId);
        }
   
  @@ -111,15 +113,15 @@
        public void initService() throws Exception {
   
                URL configFile = getClass().getClassLoader().getResource("jboss.jcml");
  -             
  +
                dataDirURL = new URL(configFile, dataDirectory);
                URL txLogFile = new URL(dataDirURL, "transactions.dat");
                spyTxLog = new SpyTxLog(txLogFile.getFile());
  -             
  +
            //Get an InitialContext
  -             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );                
  +             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );
                server.setPersistenceManager(this);
  -             
  +
        }
   
        public void restore(org.jbossmq.server.JMSServer server) throws 
javax.jms.JMSException {
  @@ -160,12 +162,12 @@
   
        public void startService() throws Exception {
   
  -             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );                
  +             JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jbossmq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[] {}, new String[] {} );
                restore(server);
  -             
  +
        }
   
  -     public void add(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  +     public void add(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) throws 
javax.jms.JMSException {
   
                LogInfo logInfo;
   
  @@ -189,7 +191,7 @@
   
                        SpyMessageLog log = (SpyMessageLog)messageLogs.remove(""+dest);
                        if( log == null )
  -                             throw new JMSException("The persistence log was never 
initialized");
  +                             throw new SpyJMSException("The persistence log was 
never initialized");
                        log.close();
   
                        file.delete();
  @@ -225,7 +227,7 @@
   
        }
   
  -     public void remove(org.jbossmq.SpyMessage message, Long txId) throws 
javax.jms.JMSException {
  +     public void remove(org.jbossmq.SpyMessage message, org.jbossmq.pm.Tx txId) 
throws javax.jms.JMSException {
   
                LogInfo logInfo;
   
  
  
  
  1.3       +40 -40    jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLog.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyMessageLog.java        2001/07/16 02:51:46     1.2
  +++ SpyMessageLog.java        2001/08/09 01:18:28     1.3
  @@ -6,35 +6,35 @@
    */
   package org.jbossmq.pm.logged;
   
  +import org.jbossmq.SpyJMSException;
  +
   import java.io.IOException;
   import java.io.Serializable;
   
  -
  -
   import javax.jms.JMSException;
   
   import org.jbossmq.SpyMessage;
   
   /**
  - * This is used to keep a log of SpyMessages arriving and leaving 
  + * This is used to keep a log of SpyMessages arriving and leaving
    * a queue.  The log can be used reconstruct the queue in case of
    * provider failure.  Integrety is kept by the use of an ObjectIntegrityLog.
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.2 $    
  + * @version $Revision: 1.3 $
    */
   public class SpyMessageLog {
   
        /////////////////////////////////////////////////////////////////////
        // Attributes
  -     /////////////////////////////////////////////////////////////////////   
  +     /////////////////////////////////////////////////////////////////////
        private ObjectIntegrityLog transactionLog;
        private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
        private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
   
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
  -     /////////////////////////////////////////////////////////////////////   
  +     /////////////////////////////////////////////////////////////////////
        static class MessageAddedRecord implements Serializable {
                long messageId;
                boolean isTransacted;
  @@ -42,7 +42,7 @@
                SpyMessage message;
                private final static long serialVersionUID = 235726945332013954L;
        }
  -     
  +
        static class MessageRemovedRecord implements Serializable {
                boolean isTransacted;
                long transactionId;
  @@ -50,33 +50,33 @@
                private final static long serialVersionUID = 235726945332013955L;
        }
   
  -             
  +
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public SpyMessageLog(String fileName) throws JMSException {
                try {
  -                     transactionLog = new ObjectIntegrityLog(fileName);      
  +                     transactionLog = new ObjectIntegrityLog(fileName);
                } catch ( IOException e ) {
                        throwJMSException("Could not open the queue's tranaction log: 
"+fileName,e);
                }
        }
  +
   
  -     
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
  -                     transactionLog.close();         
  +                     transactionLog.close();
                } catch ( IOException e ) {
                        throwJMSException("Could not close the queue's tranaction 
log.",e);
                }
        }
   
  -     synchronized public void add( SpyMessage message, Long transactionId ) throws 
JMSException {
  +     synchronized public void add( SpyMessage message, org.jbossmq.pm.Tx 
transactionId ) throws JMSException {
                try{
  -                     
  +
                        messageAddedRecord.message = message;
                        messageAddedRecord.messageId = message.messageId;
                        if( transactionId == null )     {
  @@ -85,19 +85,19 @@
                                messageAddedRecord.isTransacted = true;
                                messageAddedRecord.transactionId = 
transactionId.longValue();
                        }
  -                             
  +
                        transactionLog.add(messageAddedRecord);
                        transactionLog.commit();
  -                     
  +
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the tranaction log.",e);
                }
  -             
  -     }       
  -     
  -     synchronized public void remove( SpyMessage message, Long transactionId ) 
throws JMSException {
  +
  +     }
  +
  +     synchronized public void remove( SpyMessage message, org.jbossmq.pm.Tx 
transactionId ) throws JMSException {
                try{
  -                     
  +
                        messageRemovedRecord.messageId = message.messageId;
                        if( transactionId == null ) {
                                messageRemovedRecord.isTransacted = false;
  @@ -107,25 +107,25 @@
                        }
                        transactionLog.add(messageRemovedRecord);
                        transactionLog.commit();
  -                     
  +
                } catch ( IOException e ) {
                        throwJMSException("Could not write to the queue's tranaction 
log.",e);
                }
   
  -     }       
  -     
  +     }
  +
        synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws 
JMSException {
   
                java.util.HashMap messageIndex = new java.util.HashMap();
  -                     
  -             try {   
  +
  +             try {
                        ObjectIntegrityLog.IndexItem objects[] = 
transactionLog.toIndex();
  -                     
  +
                        for( int i=0; i < objects.length; i++ ) {
  -                             
  +
                                Object o = objects[i].record;
                                if( o instanceof MessageAddedRecord ) {
  -                                     
  +
                                        MessageAddedRecord r = (MessageAddedRecord)o;
                                        r.message.messageId = r.messageId;
   
  @@ -134,11 +134,11 @@
                                                // commited... so drop this message
                                                continue;
                                        }
  -                                     
  +
                                        messageIndex.put( new Long(r.messageId), 
objects[i]);
  -                                     
  +
                                } else if( o instanceof MessageRemovedRecord ) {
  -                                     
  +
                                        MessageRemovedRecord r = 
(MessageRemovedRecord)o;
   
                                        if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
  @@ -146,11 +146,11 @@
                                                // commited... so drop this message
                                                continue;
                                        }
  -                                     
  +
                                        messageIndex.remove( new Long(r.messageId));
  -                                     
  +
                                }
  -                             
  +
                        }
                } catch ( Exception e ) {
                        throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
  @@ -162,13 +162,13 @@
                        ObjectIntegrityLog.IndexItem item = 
(ObjectIntegrityLog.IndexItem)iter.next();
                        rc[i] = ((MessageAddedRecord)item.record).message;
                }
  -             return rc;              
  -     }       
  -     
  +             return rc;
  +     }
  +
        private void throwJMSException(String message, Exception e) throws 
JMSException {
  -             JMSException newE = new JMSException(message);
  +             JMSException newE = new SpyJMSException(message);
                newE.setLinkedException(e);
  -             throw newE;             
  +             throw newE;
        }
  -     
  +
   }
  
  
  
  1.3       +13 -13    jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java
  
  Index: SpyMessageLogTester.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyMessageLogTester.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyMessageLogTester.java  2001/07/16 02:51:46     1.2
  +++ SpyMessageLogTester.java  2001/08/09 01:18:28     1.3
  @@ -12,10 +12,10 @@
   
   /**
    * This class was used to perform unit testing on the SpyMessageLog/SpyTxLog
  - * 
    *
  + *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.2 $    
  + * @version $Revision: 1.3 $
    */
   public class SpyMessageLogTester {
   
  @@ -27,10 +27,10 @@
   
                SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat");
                SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat");
  -             
  -             try{    
  +
  +             try{
   
  -                     java.util.TreeSet commited = tm.restore();                     
         
  +                     java.util.TreeSet commited = tm.restore();
                        SpyMessage[] queue = log.restore(commited);
   
                        System.out.println("Recovered :"+queue.length+" message from 
the message log");
  @@ -39,9 +39,9 @@
                                System.out.println("  #"+i+": "+queue[i]);
                                maxMessageId = Math.max(maxMessageId, 
queue[i].messageId );
                        }
  +
  +                     org.jbossmq.pm.Tx tx1 = tm.createTx();
   
  -                     Long tx1 = tm.createTx();
  -                     
                        long first = ++maxMessageId;
                        add(log, first,tx1);
                        long second = ++maxMessageId;
  @@ -51,14 +51,14 @@
                        System.out.println("Commiting");
                        tm.commitTx(tx1);
   
  -                     Long tx2 = tm.createTx();
  +                     org.jbossmq.pm.Tx tx2 = tm.createTx();
                        add(log, first,tx2);
   
                        System.out.println("Rolling back");
                        tm.rollbackTx(tx2);
   
                        add(log, second+1, null);
  -                     
  +
                        System.exit(0);
                } finally {
                        log.close();
  @@ -67,17 +67,17 @@
        }
   
   
  -     public static void add(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
  +     public static void add(SpyMessageLog log, long messageId, org.jbossmq.pm.Tx 
txid) throws Exception {
   
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
                m.setText("Hello World #"+m.messageId);
                System.out.println("Adding message: "+m+",tx="+txid);
                log.add(m,txid);
  +
  +     }
   
  -     }       
  -     
  -     public static void remove(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
  +     public static void remove(SpyMessageLog log, long messageId, org.jbossmq.pm.Tx 
txid) throws Exception {
   
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
  
  
  
  1.3       +22 -21    jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java
  
  Index: SpyTxLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/pm/logged/SpyTxLog.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyTxLog.java     2001/07/16 02:51:46     1.2
  +++ SpyTxLog.java     2001/08/09 01:18:28     1.3
  @@ -5,7 +5,8 @@
    * See terms of license at gnu.org.
    */
   package org.jbossmq.pm.logged;
  - 
  +import org.jbossmq.SpyJMSException;
  +
   import java.io.Serializable;
   import java.io.IOException;
   
  @@ -15,7 +16,7 @@
    * This is used to keep a log of commited transactions.
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.2 $    
  + * @version $Revision: 1.3 $
    */
   public class SpyTxLog {
   
  @@ -24,7 +25,7 @@
        /////////////////////////////////////////////////////////////////////
        private ObjectIntegrityLog transactionLog;
        private long nextTransactionId = Long.MIN_VALUE;
  -     
  +
        /////////////////////////////////////////////////////////////////////
        // Constructors
        /////////////////////////////////////////////////////////////////////
  @@ -41,35 +42,35 @@
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
  -                     transactionLog.close();         
  +                     transactionLog.close();
                } catch ( IOException e ) {
                        throwJMSException("Could not close the queue's tranaction 
log.",e);
                }
        }
  -     
  -     synchronized public void commitTx(Long id) throws JMSException {
  -                             
  +
  +     synchronized public void commitTx(org.jbossmq.pm.Tx id) throws JMSException {
  +
                try {
                        transactionLog.add(id);
                        transactionLog.commit();
                } catch ( IOException e ) {
                        throwJMSException("Could not create a new transaction.",e);
                }
  -             
  +
        }
  -     
  -     synchronized public Long createTx() throws JMSException {
  -             return new Long(nextTransactionId++);
  +
  +     synchronized public org.jbossmq.pm.Tx createTx() throws JMSException {
  +             return new org.jbossmq.pm.Tx(nextTransactionId++);
        }
  -     
  +
        synchronized public java.util.TreeSet restore() throws JMSException {
  -             
  +
                java.util.TreeSet items=null;
                try {
                        items = transactionLog.toTreeSet();
                } catch ( Exception e ) {
                        throwJMSException("Could not restore the transaction log.",e);
  -             }               
  +             }
   
                long maxId = Long.MIN_VALUE;
                java.util.Iterator iter = items.iterator();
  @@ -81,20 +82,20 @@
   
                nextTransactionId = maxId+1;
                return items;
  -                     
  +
        }
   
  -     synchronized public void rollbackTx(Long txId) throws JMSException {
  -                     
  +     synchronized public void rollbackTx(org.jbossmq.pm.Tx txId) throws 
JMSException {
  +
        }
  -     
  +
        /////////////////////////////////////////////////////////////////////
        // Private Methods
        /////////////////////////////////////////////////////////////////////
        private void throwJMSException(String message, Exception e) throws 
JMSException {
  -             JMSException newE = new JMSException(message);
  +             JMSException newE = new SpyJMSException(message);
                newE.setLinkedException(e);
  -             throw newE;             
  +             throw newE;
        }
  -     
  +
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to