User: lqd Date: 02/02/19 08:11:22 Modified: src/main/org/jboss/mq/pm/rollinglogged PersistenceManager.java PersistenceManagerMBean.java Log: - add ability to set rollover size via JMX (was fix at 1000 before) - yank dead (commented out) code - replace custom "DEBUG" logging to System.out to use log4j - clean up ambigious use of "SpyMessageLog log" -> change to "messageLog" Revision Changes Path 1.18 +46 -229 jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java Index: PersistenceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java,v retrieving revision 1.17 retrieving revision 1.18 diff -u -r1.17 -r1.18 --- PersistenceManager.java 13 Feb 2002 04:26:38 -0000 1.17 +++ PersistenceManager.java 19 Feb 2002 16:11:22 -0000 1.18 @@ -39,15 +39,13 @@ * * @author David Maplesden ([EMAIL PROTECTED]) * @author <a href="mailto:[EMAIL PROTECTED]">David Jencks</a> - * @version $Revision: 1.17 $ + * @version $Revision: 1.18 $ */ public class PersistenceManager extends ServiceMBeanSupport implements org.jboss.mq.pm.PersistenceManager, PersistenceManagerMBean { - public final static int ROLL_OVER_SIZE = 1000; public final static String TRANS_FILE_NAME = "transactions.dat"; - public final static boolean DEBUG = false; protected static int MAX_POOL_SIZE = 50; @@ -74,7 +72,7 @@ TxManager txManager; private String dataDirectory; - + private int rollOverSize; private HashMap unrestoredMessages = new HashMap(); @@ -84,7 +82,8 @@ * * @exception javax.jms.JMSException Description of Exception */ - public PersistenceManager() throws javax.jms.JMSException + public PersistenceManager() + throws javax.jms.JMSException { txManager = new TxManager(this); } @@ -127,13 +126,21 @@ return dataDirectory; } + public void setRollOverSize( int rollOverSize ) + { + this.rollOverSize = rollOverSize; + } + + public int getRollOverSize() + { + return rollOverSize; + } public Object getInstance() { return this; } - /** * getTxManager method comment. * @@ -144,74 +151,14 @@ return txManager; } - /*public void initQueue(SpyDestination dest) throws javax.jms.JMSException - { - log.error("You called the wrong initQueue method"); - } - */ /** * #Description of the Method * * @param dest Description of Parameter * @exception javax.jms.JMSException Description of Exception */ - // public void initQueue(JMSDestination jmsDest) throws javax.jms.JMSException - //{ - // restoreDestination(jmsDest); - /* - SpyDestination dest = jmsDest.getSpyDestination(); - - String destName = dest.toString(); - //really? what about multiple subscribers on a topic? - queues.put(destName, dest); - //new - restoreDestination(jmsDest); - SpyTxLog txLog = null; - - // if called before we have been started there is no need to setup log files - if(currentTxLog == null) - return; - - HashMap logs; - synchronized (messageLogs) - { - logs = (HashMap)messageLogs.get(currentTxLog); - if (logs == null) - { - logs = new HashMap(); - messageLogs.put(currentTxLog, logs); - } - synchronized (logs) - { - LogInfo logInfo = (LogInfo)logs.get(destName); - - if (logInfo == null) - { - try - { - SpyMessageLog log = new SpyMessageLog(new File(dataDir, destName + ".dat" + numRollOvers)); - logInfo = new LogInfo(log, dest, currentTxLog); - logs.put(destName, logInfo); - } - catch (Exception e) - { - JMSException jme = new SpyJMSException("Error rolling over logs to new files."); - jme.setLinkedException(e); - throw jme; - } - } - } - } -*/ - // } - - /** - * #Description of the Method - * - * @param dest Description of Parameter - * @exception javax.jms.JMSException Description of Exception - */ - public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException + public void destroyQueue(SpyDestination dest) + throws javax.jms.JMSException { try @@ -233,9 +180,9 @@ { throw new SpyJMSException("The persistence log was never initialized"); } - SpyMessageLog log = logInfo.log; - log.close(); - log.delete(); + SpyMessageLog messageLog = logInfo.log; + messageLog.close(); + messageLog.delete(); HashSet deleteLogs = new HashSet(); synchronized (messageLogs) @@ -256,9 +203,9 @@ } for (Iterator it = deleteLogs.iterator(); it.hasNext(); ) { - log = (SpyMessageLog)it.next(); - log.close(); - log.delete(); + messageLog = (SpyMessageLog)it.next(); + messageLog.close(); + messageLog.delete(); } } @@ -281,7 +228,8 @@ * Setup the data directory, where messages will be stored, connects * to the message cache and restores transactions. */ - public void startService() throws Exception + public void startService() + throws Exception { log.debug("Using new rolling logged persistence manager."); @@ -313,10 +261,10 @@ * @param txId Description of Parameter * @exception javax.jms.JMSException Description of Exception */ - public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException + public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) + throws javax.jms.JMSException { - SpyMessage message = messageRef.getMessage(); - //System.out.println("Add message "+Long.toHexString(message.messageId)+" in trans "+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination()); + SpyMessage message = messageRef.getMessage(); LogInfo logInfo; SpyTxLog txLog = null; @@ -370,9 +318,9 @@ * @param txId Description of Parameter * @exception javax.jms.JMSException Description of Exception */ - public void commitPersistentTx(org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException + public void commitPersistentTx(org.jboss.mq.pm.Tx txId) + throws javax.jms.JMSException { - //System.out.println("Committing TX "+Long.toHexString(txId.longValue())); TxInfo info = null; LinkedList messagesToDelete = null; synchronized (transToTxLogs) @@ -396,7 +344,8 @@ * @return Description of the Returned Value * @exception javax.jms.JMSException Description of Exception */ - public org.jboss.mq.pm.Tx createPersistentTx() throws javax.jms.JMSException + public org.jboss.mq.pm.Tx createPersistentTx() + throws javax.jms.JMSException { org.jboss.mq.pm.Tx txId = null; SpyTxLog txLog = currentTxLog; @@ -416,11 +365,10 @@ * @param txId Description of Parameter * @exception javax.jms.JMSException Description of Exception */ - public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException + public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) + throws javax.jms.JMSException { - //System.out.println("Removing message "+Long.toHexString(message.messageId)+" in trans "+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination()); - - SpyMessage message = messageRef.getMessage(); + SpyMessage message = messageRef.getMessage(); LogInfo logInfo; SpyTxLog txLog = ((LogInfo)messageRef.persistData).txLog; @@ -468,7 +416,8 @@ } } - public void restoreTransactions() throws javax.jms.JMSException + public void restoreTransactions() + throws javax.jms.JMSException { TreeSet committedTxs = new TreeSet(); HashMap txLogs = new HashMap(); @@ -541,8 +490,8 @@ rollOverLogs(); } - - public void restoreDestination(JMSDestination jmsDest) throws javax.jms.JMSException + public void restoreDestination(JMSDestination jmsDest) + throws javax.jms.JMSException { if (jmsDest instanceof JMSQueue) { @@ -602,7 +551,6 @@ Iterator m = messages.values().iterator(); while (m.hasNext()) { - //SpyMessage message = (SpyMessage)m.next(); MessageReference message = (MessageReference)m.next(); if (dest instanceof org.jboss.mq.SpyTopic) { @@ -623,140 +571,15 @@ logs.put(queueName, newQueueInfo(dest, currentTxLog)); } } - /** - * #Description of the Method - * - * @param server Description of Parameter - * @exception javax.jms.JMSException Description of Exception - */ - public void restore(org.jboss.mq.server.JMSServer server) throws javax.jms.JMSException - { - }/* - TreeSet committedTxs = new TreeSet(); - HashMap txLogs = new HashMap(); - java.io.File dir = dataDir; - java.io.File[] dataFiles = dir.listFiles(); - - for (int i = 0; i < dataFiles.length; ++i) - { - String name = dataFiles[i].getName(); - if (name.startsWith(TRANS_FILE_NAME)) - { - int index = name.indexOf(".dat"); - if (index < 0) - { - continue; - } - String sRollOver = name.substring(index + 4); - int rollOver = Integer.parseInt(sRollOver); - numRollOvers = Math.max(numRollOvers, rollOver + 1); - SpyTxLog txLog = new SpyTxLog(dataFiles[i]); - txLog.restore(committedTxs); - txLogs.put(new Integer(rollOver), txLog); - messageLogs.put(txLog, new HashMap()); - } - } - - if (!committedTxs.isEmpty()) - { - nextTxId = ((org.jboss.mq.pm.Tx)committedTxs.last()).longValue(); - } - - for (int i = 0; i < dataFiles.length; ++i) - { - String name = dataFiles[i].getName(); - if (!name.startsWith(TRANS_FILE_NAME)) - { - int index = name.indexOf(".dat"); - if (index < 0) - { - continue; - } - String sRollOver = name.substring(index + 4); - int rollOver = Integer.parseInt(sRollOver); - //check whether there is a transaction log for this message log file. - SpyTxLog txLog = (SpyTxLog)txLogs.get(new Integer(rollOver)); - if(txLog == null) - continue; - String key = name.substring(0, name.length() - (sRollOver.length() + 4)); - SpyMessageLog messageLog = new SpyMessageLog(dataFiles[i]); -<<<<<<< PersistenceManager.java - SpyMessage[] messages = messageLog.restore(committedTxs); - SpyTxLog txLog = (SpyTxLog)txLogs.get(new Integer(rollOver)); -======= - MessageReference[] messages = messageLog.restore(commitedTxs); ->>>>>>> 1.12 - SpyDestination dest = (SpyDestination)queues.get(key); - if (dest != null) - { - JMSDestination q = server.getJMSDestination(dest); - LogInfo info = new LogInfo(messageLog, dest, txLog); - info.liveMessages = messages.length; - HashMap logs = (HashMap)messageLogs.get(txLog); - logs.put(key, info); - //TODO: make sure this lock is good enough - synchronized (q) - { - for (int j = 0; j < messages.length; j++) - { - messages[j].persistData = info; - if (dest instanceof org.jboss.mq.SpyTopic) - { - SpyMessage mesg = messages[j].getMessage(); - mesg.header.durableSubscriberID = ((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID(); - messages[j].invalidate(); // since we updated the message - } - q.restoreMessage(messages[j]); - } - } - } - } - } - - for (Iterator it = txLogs.values().iterator(); it.hasNext(); ) - { - checkCleanup((SpyTxLog)it.next()); - } - - try - { - - File txLogFile = new File(dataDir, TRANS_FILE_NAME + numRollOvers); - currentTxLog = new SpyTxLog(txLogFile); - messageLogs.put(currentTxLog, new HashMap()); - - for (Iterator it = queues.values().iterator(); it.hasNext(); ) - { - SpyDestination dest = (SpyDestination)it.next(); - String key = "" + dest; - File logFile = new File(dataDir, dest.toString() + ".dat" + numRollOvers); - SpyMessageLog log = new SpyMessageLog(logFile); - - synchronized (messageLogs) - { - LogInfo logInfo = new LogInfo(log, dest, currentTxLog); - HashMap logs = (HashMap)messageLogs.get(currentTxLog); - logs.put(key, logInfo); - } - } - - } - catch (Exception e) - { - javax.jms.JMSException newE = new javax.jms.JMSException("Invalid configuration."); - newE.setLinkedException(e); - throw newE; - } - } - */ /** * #Description of the Method * * @param txId Description of Parameter * @exception javax.jms.JMSException Description of Exception */ - public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException + public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) + throws javax.jms.JMSException { TxInfo info = null; LinkedList messagesToDelete = null; @@ -804,7 +627,6 @@ } } - protected void releaseTxInfo(TxInfo list) { if (listPool.size() < MAX_POOL_SIZE) @@ -815,11 +637,6 @@ } } - - - - - protected void deleteMessages(LinkedList messages) throws javax.jms.JMSException { for (Iterator it = messages.iterator(); it.hasNext(); ) @@ -840,11 +657,11 @@ int max = queues.size(); if (max == 0) { - max = ROLL_OVER_SIZE; + max = rollOverSize; } else { - max *= ROLL_OVER_SIZE; + max *= rollOverSize; } if (++messageCounter > max) { @@ -865,9 +682,6 @@ for (Iterator it = queues.values().iterator(); it.hasNext(); ) { SpyDestination spyDest = (SpyDestination)it.next(); - //String destName = spyDest.toString(); - // SpyMessageLog log = new SpyMessageLog(new File(dataDir, destName + ".dat" + numRollOvers)); - //LogInfo logInfo = new LogInfo(log, dest, newTxLog); logs.put(spyDest.toString(), newQueueInfo(spyDest, newTxLog)); } SpyTxLog oldLog = currentTxLog; @@ -946,9 +760,9 @@ { return; } - if (DEBUG) + if ( log.isDebugEnabled() ) { - System.out.println("Cleaning up"); + log.debug( "Cleaning up" ); } //close and delete all logs, remove data from data structures. synchronized (messageLogs) @@ -1013,3 +827,6 @@ } } } +/* +vim:tabstop=3:et:shiftwidth=3 +*/ 1.7 +18 -1 jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManagerMBean.java Index: PersistenceManagerMBean.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManagerMBean.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- PersistenceManagerMBean.java 13 Feb 2002 04:26:38 -0000 1.6 +++ PersistenceManagerMBean.java 19 Feb 2002 16:11:22 -0000 1.7 @@ -13,7 +13,7 @@ * MBean interface for the JBossMQ JMX service. * * @author Vincent Sheffer ([EMAIL PROTECTED]) - * @version $Revision: 1.6 $ + * @version $Revision: 1.7 $ */ public interface PersistenceManagerMBean extends ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean @@ -32,4 +32,21 @@ */ void setDataDirectory(String newDataDirectory); + /** + * Gets maximum number of messages until log rolls over + * + * @return number of messages before log rolls over + */ + public int getRollOverSize(); + + /** + * Sets the maxmimum number of messages before log rolls over + * @param rollOverSize The maxmimum number of messages before + * rollover occurs + */ + public void setRollOverSize( int rollOverSize ); + } +/* +vim:tabstop=3:et:shiftwidth=3 +*/
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development