Author: rajdavies Date: Fri Jun 20 05:52:41 2008 New Revision: 669879 URL: http://svn.apache.org/viewvc?rev=669879&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1814
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java?rev=669879&r1=669878&r2=669879&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java Fri Jun 20 05:52:41 2008 @@ -300,6 +300,13 @@ public void setValueSize(int valueSize) { this.valueSize = valueSize; } + + void copyIndex(IndexItem other) { + this.offset=other.offset; + this.active=other.active; + this.previousItem=other.previousItem; + this.nextItem=other.nextItem; + } /** * @return print of 'this' Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?rev=669879&r1=669878&r2=669879&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Fri Jun 20 05:52:41 2008 @@ -49,6 +49,7 @@ private IndexItem lastFree; private boolean dirty; private final AtomicLong storeSize; + private int freeSize = 0; public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException { this.directory = directory; @@ -64,7 +65,11 @@ } public synchronized IndexItem getIndex(long offset) throws IOException { - return reader.readItem(offset); + IndexItem result = null; + if (offset >= 0) { + result = reader.readItem(offset); + } + return result; } public synchronized IndexItem refreshIndex(IndexItem item) throws IOException { @@ -80,8 +85,16 @@ lastFree = item; } else { lastFree.setNextItem(item.getOffset()); + if (lastFree.equals(firstFree)) { + firstFree=new IndexItem(); + firstFree.copyIndex(lastFree); + writer.updateIndexes(firstFree); + } + writer.updateIndexes(lastFree); + lastFree=item; } writer.updateIndexes(item); + freeSize++; dirty = true; } @@ -155,6 +168,8 @@ } } result.reset(); + writer.updateIndexes(result); + freeSize--; } return result; } @@ -200,6 +215,7 @@ lastFree = index; firstFree = index; } + freeSize++; } offset += IndexItem.INDEX_SIZE; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=669879&r1=669878&r2=669879&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java Fri Jun 20 05:52:41 2008 @@ -44,6 +44,7 @@ public static final int DEFAULT_BIN_SIZE; public static final int MAXIMUM_CAPACITY; public static final int DEFAULT_LOAD_FACTOR; + private static final int LOW_WATER_MARK=1024*16; private static final String NAME_PREFIX = "hash-index-"; private static final Log LOG = LogFactory.getLog(HashIndex.class); private final String name; @@ -67,6 +68,7 @@ private boolean enablePageCaching=false;//this is off by default - see AMQ-1667 private int pageCacheSize = 10; private int size; + private int highestSize=0; private int activeBins; private int threshold; private int maximumCapacity=MAXIMUM_CAPACITY; @@ -275,11 +277,14 @@ entry.setKey((Comparable)key); entry.setIndexOffset(value.getOffset()); if (!getBin(key).put(entry)) { - size++; + this.size++; } - if (size >= threshold) { + if (this.size >= this.threshold) { resize(2*bins.length); } + if(this.size > this.highestSize) { + this.highestSize=this.size; + } } public synchronized StoreEntry get(Object key) throws IOException { @@ -292,14 +297,22 @@ public synchronized StoreEntry remove(Object key) throws IOException { load(); + StoreEntry result = null; HashEntry entry = new HashEntry(); entry.setKey((Comparable)key); - HashEntry result = getBin(key).remove(entry); - if (result != null) { - size--; - return indexManager.getIndex(result.getIndexOffset()); + HashEntry he = getBin(key).remove(entry); + if (he != null) { + this.size--; + result = this.indexManager.getIndex(he.getIndexOffset()); + } + if (this.highestSize > LOW_WATER_MARK && this.highestSize > (this.size *2)) { + int newSize = this.size/this.keysPerPage; + newSize = Math.max(128, newSize); + this.highestSize=0; + resize(newSize); + } - return null; + return result; } public synchronized boolean containsKey(Object key) throws IOException { @@ -523,42 +536,53 @@ } private void resize(int newCapacity) throws IOException { - if (bins.length == getMaximumCapacity()) { - threshold = Integer.MAX_VALUE; - return; - } - String backFileName = name + "-REISZE"; - HashIndex backIndex = new HashIndex(directory,backFileName,indexManager); - backIndex.setKeyMarshaller(keyMarshaller); - backIndex.setKeySize(getKeySize()); - backIndex.setNumberOfBins(newCapacity); - backIndex.setPageSize(getPageSize()); - backIndex.load(); - File backFile = backIndex.file; - long offset = 0; - while ((offset + pageSize) <= indexFile.length()) { - indexFile.seek(offset); - HashPage page = getFullPage(offset); - if (page.isActive()) { - for (HashEntry entry : page.getEntries()) { - backIndex.getBin(entry.getKey()).put(entry); - backIndex.size++; + if (bins.length < getMaximumCapacity()) { + if (newCapacity != numberOfBins) { + int capacity = 1; + while (capacity < newCapacity) { + capacity <<= 1; + } + if (newCapacity != numberOfBins) { + LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity); + + String backFileName = name + "-REISZE"; + HashIndex backIndex = new HashIndex(directory,backFileName,indexManager); + backIndex.setKeyMarshaller(keyMarshaller); + backIndex.setKeySize(getKeySize()); + backIndex.setNumberOfBins(newCapacity); + backIndex.setPageSize(getPageSize()); + backIndex.load(); + File backFile = backIndex.file; + long offset = 0; + while ((offset + pageSize) <= indexFile.length()) { + indexFile.seek(offset); + HashPage page = getFullPage(offset); + if (page.isActive()) { + for (HashEntry entry : page.getEntries()) { + backIndex.getBin(entry.getKey()).put(entry); + backIndex.size++; + } + } + page=null; + offset += pageSize; + } + backIndex.unload(); + + unload(); + IOHelper.deleteFile(file); + IOHelper.copyFile(backFile, file); + IOHelper.deleteFile(backFile); + setNumberOfBins(newCapacity); + bins = new HashBin[newCapacity]; + threshold = calculateThreashold(); + openIndexFile(); + doLoad(); } } - page=null; - offset += pageSize; + }else { + threshold = Integer.MAX_VALUE; + return; } - backIndex.unload(); - - unload(); - IOHelper.deleteFile(file); - IOHelper.copyFile(backFile, file); - IOHelper.deleteFile(backFile); - setNumberOfBins(newCapacity); - bins = new HashBin[newCapacity]; - threshold = calculateThreashold(); - openIndexFile(); - doLoad(); } private int calculateThreashold() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=669879&r1=669878&r2=669879&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Jun 20 05:52:41 2008 @@ -121,7 +121,7 @@ private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; - private int maxReferenceFileLength=AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; + private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> (); private String directoryPath = ""; private RandomAccessFile lockFile; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=669879&r1=669878&r2=669879&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Fri Jun 20 05:52:41 2008 @@ -19,6 +19,7 @@ import java.io.File; import org.apache.activemq.kaha.impl.async.AsyncDataManager; +import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterFactory; import org.apache.activemq.store.ReferenceStoreAdapter; @@ -33,7 +34,7 @@ * @version $Revision: 1.17 $ */ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { - + static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024; private TaskRunnerFactory taskRunnerFactory; private File dataDirectory; private int journalThreadPriority = Thread.MAX_PRIORITY; @@ -45,6 +46,12 @@ private boolean useNio = true; private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL; + private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; + private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; + private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; + private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; + private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; + private int maxReferenceFileLength=DEFAULT_MAX_REFERNCE_FILE_LENGTH; /** @@ -62,6 +69,12 @@ result.setUseNio(isUseNio()); result.setMaxFileLength(getMaxFileLength()); result.setCleanupInterval(getCleanupInterval()); + result.setIndexBinSize(getIndexBinSize()); + result.setIndexKeySize(getIndexKeySize()); + result.setIndexPageSize(getIndexPageSize()); + result.setIndexMaxBinSize(getIndexMaxBinSize()); + result.setIndexLoadFactor(getIndexLoadFactor()); + result.setMaxReferenceFileLength(getMaxReferenceFileLength()); return result; } @@ -189,4 +202,88 @@ public void setMaxFileLength(int maxFileLength) { this.maxFileLength = maxFileLength; } + + /** + * @return the indexBinSize + */ + public int getIndexBinSize() { + return indexBinSize; + } + + /** + * @param indexBinSize the indexBinSize to set + */ + public void setIndexBinSize(int indexBinSize) { + this.indexBinSize = indexBinSize; + } + + /** + * @return the indexKeySize + */ + public int getIndexKeySize() { + return indexKeySize; + } + + /** + * @param indexKeySize the indexKeySize to set + */ + public void setIndexKeySize(int indexKeySize) { + this.indexKeySize = indexKeySize; + } + + /** + * @return the indexPageSize + */ + public int getIndexPageSize() { + return indexPageSize; + } + + /** + * @param indexPageSize the indexPageSize to set + */ + public void setIndexPageSize(int indexPageSize) { + this.indexPageSize = indexPageSize; + } + + /** + * @return the indexMaxBinSize + */ + public int getIndexMaxBinSize() { + return indexMaxBinSize; + } + + /** + * @param indexMaxBinSize the indexMaxBinSize to set + */ + public void setIndexMaxBinSize(int indexMaxBinSize) { + this.indexMaxBinSize = indexMaxBinSize; + } + + /** + * @return the indexLoadFactor + */ + public int getIndexLoadFactor() { + return indexLoadFactor; + } + + /** + * @param indexLoadFactor the indexLoadFactor to set + */ + public void setIndexLoadFactor(int indexLoadFactor) { + this.indexLoadFactor = indexLoadFactor; + } + + /** + * @return the maxReferenceFileLength + */ + public int getMaxReferenceFileLength() { + return maxReferenceFileLength; + } + + /** + * @param maxReferenceFileLength the maxReferenceFileLength to set + */ + public void setMaxReferenceFileLength(int maxReferenceFileLength) { + this.maxReferenceFileLength = maxReferenceFileLength; + } }