Author: rajdavies
Date: Fri Nov 2 13:09:57 2007
New Revision: 591442
URL: http://svn.apache.org/viewvc?rev=591442&view=rev
Log:
Further enhancement to https://issues.apache.org/activemq/browse/AMQ-1246
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
Fri Nov 2 13:09:57 2007
@@ -181,9 +181,36 @@
}
private void addHashEntry(int index, HashEntry entry) throws IOException {
- HashPageInfo page = getInsertPage(index);
- int offset = index % maximumEntries;
- page.addHashEntry(offset, entry);
+ HashPageInfo pageToUse = null;
+ int offset = 0;
+ if (index >= maximumBinSize()) {
+ HashPage hp = hashIndex.createPage(id);
+ pageToUse = addHashPageInfo(hp.getId(), 0);
+ pageToUse.setPage(hp);
+ offset = 0;
+ } else {
+
+ int count = 0;
+ int countSoFar=0;
+ int pageNo = 0;
+ for (HashPageInfo page : hashPages) {
+ count += page.size();
+ if (index < count ) {
+ offset = index - countSoFar;
+ break;
+ }
+ if (index == count && page.size()+1 <= maximumEntries) {
+ offset = page.size();
+ break;
+ }
+ countSoFar += page.size();
+ pageNo++;
+ }
+ pageToUse = hashPages.get(pageNo);
+ }
+ pageToUse.begin();
+
+ pageToUse.addHashEntry(offset, entry);
doOverFlow(index);
}
@@ -202,25 +229,12 @@
HashEntry result = page.getHashEntry(offset);
return result;
}
+
private int maximumBinSize() {
return maximumEntries * hashPages.size();
}
- private HashPageInfo getInsertPage(int index) throws IOException {
- HashPageInfo result = null;
- if (index >= maximumBinSize()) {
- HashPage page = hashIndex.createPage(id);
- result = addHashPageInfo(page.getId(), 0);
- result.setPage(page);
- } else {
- int offset = index / maximumEntries;
- result = hashPages.get(offset);
- }
- result.begin();
- return result;
- }
-
private HashPageInfo getRetrievePage(int index) throws IOException {
HashPageInfo result = null;
int count = 0;
@@ -250,16 +264,6 @@
}
return result;
}
-
-// private int getInsertPageNo(int index) {
-// int result = index / maximumEntries;
-// return result;
-// }
-//
-// private int getOffset(int index) {
-// int result = index % maximumEntries;
-// return result;
-// }
private void doOverFlow(int index) throws IOException {
int pageNo = index / maximumEntries;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
Fri Nov 2 13:09:57 2007
@@ -40,7 +40,6 @@
private int binId;
private int persistedSize;
private List<HashEntry> hashIndexEntries;
- private static final HashEntry nullEntry = new HashEntry();
/*
* for persistence only
*/
@@ -193,11 +192,6 @@
void addHashEntry(int index, HashEntry entry) throws IOException {
// index = index >= 0 ? index : 0;
// index = (index == 0 || index< size()) ? index : size()-1;
- if (index > hashIndexEntries.size()) {
- for (int i = hashIndexEntries.size(); i < (index+1);i++) {
- hashIndexEntries.add(nullEntry);
- }
- }
hashIndexEntries.add(index, entry);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Fri Nov 2 13:09:57 2007
@@ -55,6 +55,8 @@
private static final Log LOG =
LogFactory.getLog(KahaPersistenceAdapter.class);
private static final String STORE_STATE = "store-state";
+ private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
+ private static final Integer INDEX_VERSION = new Integer(2);
private static final String RECORD_REFERENCES = "record-references";
private static final String TRANSACTIONS = "transactions-state";
private MapContainer stateMap;
@@ -67,6 +69,7 @@
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+
public KahaReferenceStoreAdapter(AtomicLong size){
super(size);
@@ -94,12 +97,21 @@
storeValid = status.get();
}
if (storeValid) {
+ //check what version the indexes are at
+ Integer indexVersion = (Integer)
stateMap.get(INDEX_VERSION_NAME);
+ if (indexVersion==null || indexVersion.intValue() <
INDEX_VERSION.intValue()) {
+ storeValid = false;
+ LOG.warn("Indexes at an older version - need to
regenerate");
+ }
+ }
+ if (storeValid) {
if (stateMap.containsKey(RECORD_REFERENCES)) {
recordReferences = (Map<Integer,
AtomicInteger>)stateMap.get(RECORD_REFERENCES);
}
}
}
stateMap.put(STORE_STATE, new AtomicBoolean());
+ stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
durableSubscribers = store.getListContainer("durableSubscribers");
durableSubscribers.setMarshaller(new CommandMarshaller());
preparedTransactions = store.getMapContainer("transactions",
TRANSACTIONS, false);
@@ -112,6 +124,7 @@
public synchronized void stop() throws Exception {
stateMap.put(RECORD_REFERENCES, recordReferences);
stateMap.put(STORE_STATE, new AtomicBoolean(true));
+ stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
if (this.stateStore != null) {
this.stateStore.close();
this.stateStore = null;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java?rev=591442&r1=591441&r2=591442&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
Fri Nov 2 13:09:57 2007
@@ -25,15 +25,17 @@
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.util.IOHelper;
-
/**
* Test a HashIndex
*/
public class HashTest extends TestCase {
- private static final int COUNT = 1000;
+ private static final int COUNT = 10000;
+
private HashIndex hashIndex;
+
private File directory;
+
private IndexManager indexManager;
/**
@@ -44,8 +46,12 @@
super.setUp();
directory = new File(IOHelper.getDefaultDataDirectory());
directory.mkdirs();
- indexManager = new IndexManager(directory, "im-hash-test", "rw", null,
new AtomicLong());
+ IOHelper.deleteChildren(directory);
+ indexManager = new IndexManager(directory, "im-hash-test", "rw", null,
+ new AtomicLong());
this.hashIndex = new HashIndex(directory, "testHash", indexManager);
+ this.hashIndex.setNumberOfBins(12);
+ this.hashIndex.setPageSize(32 * 1024);
this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
}
@@ -56,7 +62,7 @@
doTest(600);
hashIndex.clear();
hashIndex.unload();
- doTest(1024 * 4);
+ doTest(128);
}
public void doTest(int pageSize) throws Exception {
@@ -66,8 +72,11 @@
doInsert(keyRoot);
checkRetrieve(keyRoot);
doRemove(keyRoot);
+
doInsert(keyRoot);
- doRemoveBackwards(keyRoot);
+ doRemoveHalf(keyRoot);
+ doInsertHalf(keyRoot);
+ checkRetrieve(keyRoot);
}
void doInsert(String keyRoot) throws Exception {
@@ -75,23 +84,41 @@
IndexItem value = indexManager.createNewIndex();
indexManager.storeIndex(value);
hashIndex.store(keyRoot + i, value);
-
}
}
void checkRetrieve(String keyRoot) throws IOException {
for (int i = 0; i < COUNT; i++) {
- IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
+ IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
assertNotNull(item);
}
}
+ void doRemoveHalf(String keyRoot) throws Exception {
+ for (int i = 0; i < COUNT; i++) {
+ if (i % 2 == 0) {
+ hashIndex.remove(keyRoot + i);
+ }
+
+ }
+ }
+
+ void doInsertHalf(String keyRoot) throws Exception {
+ for (int i = 0; i < COUNT; i++) {
+ if (i % 2 == 0) {
+ IndexItem value = indexManager.createNewIndex();
+ indexManager.storeIndex(value);
+ hashIndex.store(keyRoot + i, value);
+ }
+ }
+ }
+
void doRemove(String keyRoot) throws Exception {
for (int i = 0; i < COUNT; i++) {
hashIndex.remove(keyRoot + i);
}
for (int i = 0; i < COUNT; i++) {
- IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
+ IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
assertNull(item);
}
}
@@ -101,7 +128,7 @@
hashIndex.remove(keyRoot + i);
}
for (int i = 0; i < COUNT; i++) {
- IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
+ IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
assertNull(item);
}
}