User: chirino Date: 01/11/13 20:23:27 Modified: src/main/org/jboss/mq/server MessageCache.java MessageCacheMBean.java MessageReference.java Log: Factored out a CacheStore object from the message store. This should lay the ground work needed so that the MessageCache can use a PM for saving and loading messages. Also fixed a small bug in the file PM. It was not properly restoring the message after a server restart. Revision Changes Path 1.7 +25 -52 jbossmq/src/main/org/jboss/mq/server/MessageCache.java Index: MessageCache.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageCache.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- MessageCache.java 2001/11/14 01:53:40 1.6 +++ MessageCache.java 2001/11/14 04:23:27 1.7 @@ -15,6 +15,7 @@ import org.jboss.system.ServiceMBeanSupport; import org.jboss.mq.SpyMessage; import javax.jms.JMSException; +import org.jboss.mq.pm.CacheStore; /** * This class implements a Message cache so that larger amounts of messages @@ -23,7 +24,7 @@ * later. * * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> - * @version $Revision: 1.6 $ + * @version $Revision: 1.7 $ */ public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, MBeanRegistration, Runnable { @@ -39,8 +40,9 @@ int cacheHits = 0; int cacheMisses = 0; - private File dataFile; - private String dataDirectory; + CacheStore cacheStore; + ObjectName cacheStoreObjectName; + private Thread referenceSoftner; private long highMemoryMark = 1024L * 1000 * 16; @@ -219,29 +221,20 @@ ////////////////////////////////////////////////////////////////////////////////// // Perisitence methods used by the MessageReference. - // TODO: delegate this work to a PM. ////////////////////////////////////////////////////////////////////////////////// - SpyMessage loadFromStorage(MessageReference mh) throws IOException, ClassNotFoundException + SpyMessage loadFromStorage(MessageReference mh) throws JMSException { - File f = new File(dataFile, "Message-" + mh.referenceId); - ObjectInputStream is = new ObjectInputStream(new BufferedInputStream(new FileInputStream(f))); - Object rc = is.readObject(); - is.close(); - return (SpyMessage) rc; + return (SpyMessage)cacheStore.loadFromStorage(mh); } - void saveToStorage(MessageReference mh, SpyMessage message) throws IOException + void saveToStorage(MessageReference mh, SpyMessage message) throws JMSException { - File f = new File(dataFile, "Message-" + mh.referenceId); - ObjectOutputStream os = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f))); - os.writeObject(message); - os.close(); + cacheStore.saveToStorage(mh, message); } - void removeFromStorage(MessageReference mh) throws IOException + void removeFromStorage(MessageReference mh) throws JMSException { - File f = new File(dataFile, "Message-" + mh.referenceId); - f.delete(); + cacheStore.removeFromStorage(mh); } ////////////////////////////////////////////////////////////////////////////////// @@ -256,29 +249,15 @@ */ protected void startService() throws Exception { - File jbossHome = new File(System.getProperty("jboss.system.home")); - dataFile = new File(jbossHome, dataDirectory); - log.debug("Data directory set to: " + dataFile.getCanonicalPath()); - - dataFile.mkdirs(); - if (!dataFile.isDirectory()) - throw new Exception("The configured data directory is not valid: " + dataDirectory); - - // Clean out the directory of any previous files. - File files[] = dataFile.listFiles(); - log.debug("Removing " + files.length + " file(s) from: " + dataFile.getCanonicalPath()); - for (int i = 0; i < files.length; i++) - { - files[i].delete(); - } - + + cacheStore = (CacheStore)getServer().invoke(cacheStoreObjectName, "getInstance", new Object[] {}, new String[] {}); + if (getState() == ServiceMBeanSupport.STARTED) throw new Exception("Cannot be initialized from the current state"); referenceSoftner = new Thread(this, "JBossMQ Cache Reference Softner"); referenceSoftner.setDaemon(true); referenceSoftner.start(); - } /** @@ -293,22 +272,6 @@ referenceSoftner = null; } - /** - * Gets the dataDirectory - * @return Returns a String - */ - public String getDataDirectory() - { - return dataDirectory; - } - /** - * Sets the dataDirectory - * @param dataDirectory The dataDirectory to set - */ - public void setDataDirectory(String dataDirectory) - { - this.dataDirectory = dataDirectory; - } /** * Gets the hardRefCacheSize @@ -408,6 +371,8 @@ } /** + * TODO: Update so that it sets a CacheStore + * * This test creates 5000 x 100K messages and places them * in the MessageCache. With out a cache this would be * 500 Megs of memory needed. The cache will allow us to @@ -419,7 +384,8 @@ MessageCache cache = new MessageCache(); File tempDir = new File("Temp-" + System.currentTimeMillis()); tempDir.mkdirs(); - cache.setDataDirectory(tempDir.getCanonicalPath()); + + //cache.setDataDirectory(tempDir.getCanonicalPath()); cache.setHighMemoryMark(40); cache.setMaxMemoryMark(60); cache.start(); @@ -468,4 +434,11 @@ log.info("Cache Misses=" + cache.getCacheMisses()); } + /** + * @see MessageCacheMBean#setCacheStore(ObjectName) + */ + public void setCacheStore(ObjectName cacheStore) { + cacheStoreObjectName = cacheStore; + } + } 1.3 +10 -19 jbossmq/src/main/org/jboss/mq/server/MessageCacheMBean.java Index: MessageCacheMBean.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageCacheMBean.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- MessageCacheMBean.java 2001/11/10 21:38:05 1.2 +++ MessageCacheMBean.java 2001/11/14 04:23:27 1.3 @@ -6,27 +6,16 @@ */ package org.jboss.mq.server; import org.jboss.system.ServiceMBean; +import javax.management.ObjectName; /** * Defines the managment interface that is exposed to the MessageCache * * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public interface MessageCacheMBean extends ServiceMBean { /** - * Gets the dataDirectory - * @return Returns a String - */ - public String getDataDirectory(); - - /** - * Sets the dataDirectory - * @param dataDirectory The dataDirectory to set - */ - public void setDataDirectory(String dataDirectory); - - /** * Gets the hardRefCacheSize * @return Returns a int */ @@ -62,27 +51,29 @@ * @return Returns a long */ public long getHighMemoryMark(); - + /** * Sets the highMemoryMark * @param highMemoryMark The highMemoryMark to set */ public void setHighMemoryMark(long highMemoryMark); - + /** * Gets the maxMemoryMark * @return Returns a long */ public long getMaxMemoryMark(); - + /** * Sets the maxMemoryMark * @param maxMemoryMark The maxMemoryMark to set */ public void setMaxMemoryMark(long maxMemoryMark); - - + + public long getCurrentMemoryUsage(); - + public MessageCache getInstance(); + + void setCacheStore(ObjectName cacheStore); } 1.5 +60 -68 jbossmq/src/main/org/jboss/mq/server/MessageReference.java Index: MessageReference.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/MessageReference.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- MessageReference.java 2001/11/14 01:53:40 1.4 +++ MessageReference.java 2001/11/14 04:23:27 1.5 @@ -5,10 +5,16 @@ * See terms of license at gnu.org. */ package org.jboss.mq.server; -import java.lang.ref.*; -import org.jboss.mq.SpyMessage; -import org.jboss.mq.SpyJMSException; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; + import javax.jms.JMSException; + +import org.apache.log4j.Category; +import org.jboss.mq.SpyMessage; +import org.jboss.mq.SpyMessage.Header; + /** * This class holds a reference to an actual Message. Where it is actually * at may vary. The reference it holds may be a: @@ -19,7 +25,7 @@ * </ul> * * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> - * @version $Revision: 1.4 $ + * @version $Revision: 1.5 $ */ public class MessageReference implements Comparable { org.apache.log4j.Category cat = org.apache.log4j.Category.getInstance(MessageReference.class); @@ -32,20 +38,20 @@ } } - Long referenceId; - MessageCache messageCache; - SpyMessage hardReference; - MessageSoftReference softReference; + public Long referenceId; + public MessageCache messageCache; + public SpyMessage hardReference; + public MessageSoftReference softReference; boolean isStored; // These fields are copied over from the messae itself.. // they are used too often to not have them handy. - byte jmsPriority; - long messageId; + public byte jmsPriority; + public long messageId; // This object could be used by the PM to associate some info public transient Object persistData = null; - MessageReference(){ + MessageReference() { } //init and reset methods for use by object pool @@ -58,7 +64,7 @@ this.isStored = false; } - void reset(){ + void reset() { //clear refs so gc can collect unused objects this.messageCache = null; this.hardReference = null; @@ -94,16 +100,12 @@ return getMessage().header; } - void clear() throws SpyJMSException { + void clear() throws JMSException { cat.debug("clear lock aquire"); synchronized (this) { - try { - if (isStored) { - messageCache.removeFromStorage(this); - isStored = false; - } - } catch (java.io.IOException e) { - throw new SpyJMSException("Could not remove the message from storage", e); + if (isStored) { + messageCache.removeFromStorage(this); + isStored = false; } cat.debug("clear lock relased"); } @@ -116,29 +118,25 @@ synchronized void makeSoft() throws JMSException { cat.debug("makeSoft lock aquire"); synchronized (this) { - try { - if (softReference != null) - return; - - if (!isStored) { - messageCache.saveToStorage(this, hardReference); - isStored = true; - } - - softReference = new MessageSoftReference(this, hardReference, messageCache.referenceQueue); - - //I am undecided about whether to release this message back to the message pool or not - //the whole point of this caching business is to release the memory this message is using up - //if we are going to do it then this is the place... - //MessagePool.releaseMessage(hardReference); - - // We don't need the hard ref anymore.. - hardReference = null; - - messageCache.softRefCacheSize++; - } catch (java.io.IOException e) { - throw new SpyJMSException("Message Cache IO error: ", e); + if (softReference != null) + return; + + if (!isStored) { + messageCache.saveToStorage(this, hardReference); + isStored = true; } + + softReference = new MessageSoftReference(this, hardReference, messageCache.referenceQueue); + + //I am undecided about whether to release this message back to the message pool or not + //the whole point of this caching business is to release the memory this message is using up + //if we are going to do it then this is the place... + //MessagePool.releaseMessage(hardReference); + + // We don't need the hard ref anymore.. + hardReference = null; + + messageCache.softRefCacheSize++; cat.debug("makeSoft lock released"); } } @@ -146,32 +144,26 @@ synchronized void makeHard() throws JMSException { cat.debug("makeHard lock aquire"); synchronized (this) { - try { - // allready hard - if (hardReference != null) - return; - - // Get the object via the softref - hardReference = (SpyMessage) softReference.get(); - - // It might have been removed from the cache due to memory constraints - if (hardReference == null) { - // load it from disk. - hardReference = messageCache.loadFromStorage(this); - messageCache.cacheMisses++; - } else { - messageCache.cacheHits++; - messageCache.softRefCacheSize--; - } - - // Since we have hard ref, we do not need the soft one. - softReference = null; - - } catch (ClassNotFoundException e) { - throw new SpyJMSException("Message Cache IO error: ", e); - } catch (java.io.IOException e) { - throw new SpyJMSException("Message Cache IO error: ", e); + // allready hard + if (hardReference != null) + return; + + // Get the object via the softref + hardReference = (SpyMessage) softReference.get(); + + // It might have been removed from the cache due to memory constraints + if (hardReference == null) { + // load it from disk. + hardReference = messageCache.loadFromStorage(this); + messageCache.cacheMisses++; + } else { + messageCache.cacheHits++; + messageCache.softRefCacheSize--; } + + // Since we have hard ref, we do not need the soft one. + softReference = null; + cat.debug("makeHard lock released"); } } @@ -201,4 +193,4 @@ return (int) (messageId - sm.messageId); } -} +} \ No newline at end of file
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development