Author: rajdavies
Date: Sun Mar 2 23:18:00 2008
New Revision: 632964
URL: http://svn.apache.org/viewvc?rev=632964&view=rev
Log:
Compress HashIndex on startup - only way to ensure the index
pages are loaded in correct order without changing the wire format
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
Sun Mar 2 23:18:00 2008
@@ -92,7 +92,7 @@
return size;
}
- HashPageInfo addHashPageInfo(long id, int size) {
+ HashPageInfo addHashPageInfo(long id, int size) throws IOException {
HashPageInfo info = new HashPageInfo(hashIndex);
info.setId(id);
info.setSize(size);
@@ -105,7 +105,7 @@
HashEntry result = null;
try {
int low = 0;
- int high = size() - 1;
+ int high = size()-1;
while (low <= high) {
int mid = (low + high) >> 1;
HashEntry te = getHashEntry(mid);
@@ -129,7 +129,7 @@
boolean replace = false;
try {
int low = 0;
- int high = size() - 1;
+ int high = size()-1;
while (low <= high) {
int mid = (low + high) >> 1;
HashEntry midVal = getHashEntry(mid);
@@ -223,7 +223,7 @@
HashPageInfo page = getRetrievePage(index);
int offset = getRetrieveOffset(index);
HashEntry result = page.removeHashEntry(offset);
-
+
if (page.isEmpty()) {
hashPages.remove(page);
hashIndex.releasePage(page.getPage());
@@ -280,7 +280,7 @@
// overflowed
info.begin();
HashEntry entry = info.removeHashEntry(info.size() - 1);
- doOverFlow(hashPages.indexOf(info) + 1, entry);
+ doOverFlow(hashPages.indexOf(info)+1, entry);
}
}
@@ -298,13 +298,22 @@
if (info.size() > maximumEntries) {
// overflowed
HashEntry overflowed = info.removeHashEntry(info.size() - 1);
- doOverFlow(pageNo + 1, overflowed);
+ doOverFlow(pageNo+1, overflowed);
}
}
private void doUnderFlow(int index) {
}
+ String dump() throws IOException {
+ String str = "[" + hashPages.size()+"]";
+ for (HashPageInfo page : hashPages) {
+ page.begin();
+ str +=page.dump();
+ page.end();
+ }
+ return str;
+ }
private void end() throws IOException {
for (HashPageInfo info : hashPages) {
info.end();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
Sun Mar 2 23:18:00 2008
@@ -17,8 +17,10 @@
package org.apache.activemq.kaha.impl.index.hash;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.kaha.Marshaller;
@@ -65,7 +67,7 @@
private int pageCacheSize = 10;
private int size;
private int activeBins;
-
+
/**
* Constructor
@@ -198,29 +200,17 @@
readBuffer = new byte[pageSize];
try {
openIndexFile();
- long offset = 0;
- while ((offset + pageSize) <= indexFile.length()) {
- indexFile.seek(offset);
- indexFile.readFully(readBuffer, 0,
HashPage.PAGE_HEADER_SIZE);
- dataIn.restart(readBuffer);
- HashPage page = new HashPage(keysPerPage);
- page.setId(offset);
- page.readHeader(dataIn);
- if (!page.isActive()) {
- freeList.add(page);
- } else {
- addToBin(page);
- size+=page.size();
- }
- offset += pageSize;
+ if (indexFile.length() > 0) {
+ doCompress();
}
- length = offset;
} catch (IOException e) {
LOG.error("Failed to load index ", e);
throw new RuntimeException(e);
}
}
}
+
+
public synchronized void unload() throws IOException {
if (loaded.compareAndSet(true, false)) {
@@ -228,6 +218,7 @@
indexFile.close();
indexFile = null;
freeList.clear();
+ pageCache.clear();
bins = new HashBin[bins.length];
}
}
@@ -330,6 +321,7 @@
result = freeList.removeFirst();
result.setActive(true);
result.reset();
+ writePageHeader(result);
}
return result;
}
@@ -371,7 +363,7 @@
return page;
}
- void addToBin(HashPage page) {
+ void addToBin(HashPage page) throws IOException {
HashBin bin = getBin(page.getBinId());
bin.addHashPageInfo(page.getId(), page.getPersistedSize());
}
@@ -393,7 +385,7 @@
indexFile = new RandomAccessFile(file, "rw");
}
}
-
+
private HashBin getBin(Object key) {
int hash = hash(key);
int i = indexFor(hash, bins.length);
@@ -419,6 +411,61 @@
pageCache.remove(page.getId());
}
}
+
+ private void doLoad() throws IOException {
+ long offset = 0;
+ if (loaded.compareAndSet(false, true)) {
+ while ((offset + pageSize) <= indexFile.length()) {
+ indexFile.seek(offset);
+ indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
+ dataIn.restart(readBuffer);
+ HashPage page = new HashPage(keysPerPage);
+ page.setId(offset);
+ page.readHeader(dataIn);
+ if (!page.isActive()) {
+ page.reset();
+ freeList.add(page);
+ } else {
+ addToBin(page);
+ size+=page.size();
+ }
+ offset += pageSize;
+ }
+ length=offset;
+ }
+ }
+
+ private void doCompress() throws IOException {
+ String backFileName = name + "-COMPRESS";
+ HashIndex backIndex = new
HashIndex(directory,backFileName,indexManager);
+ backIndex.setKeyMarshaller(keyMarshaller);
+ backIndex.setKeySize(getKeySize());
+ backIndex.setNumberOfBins(getNumberOfBins());
+ backIndex.setPageSize(getPageSize());
+ backIndex.load();
+ File backFile = backIndex.file;
+ long offset = 0;
+ while ((offset + pageSize) <= indexFile.length()) {
+ indexFile.seek(offset);
+ HashPage page = getFullPage(offset);
+ if (page.isActive()) {
+ for (HashEntry entry : page.getEntries()) {
+ backIndex.getBin(entry.getKey()).put(entry);
+ backIndex.size++;
+ }
+ }
+ offset += pageSize;
+ }
+ backIndex.unload();
+
+ unload();
+ IOHelper.deleteFile(file);
+ IOHelper.copyFile(backFile, file);
+ IOHelper.deleteFile(backFile);
+ openIndexFile();
+ doLoad();
+ }
+
static int hash(Object x) {
int h = x.hashCode();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
Sun Mar 2 23:18:00 2008
@@ -38,8 +38,8 @@
private int maximumEntries;
private long id;
private int binId;
- private int persistedSize;
private List<HashEntry> hashIndexEntries;
+ private int persistedSize;
/*
* for persistence only
*/
@@ -71,7 +71,7 @@
}
public String toString() {
- return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " +
hashIndexEntries.size();
+ return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " +
persistedSize;
}
public boolean equals(Object o) {
@@ -95,14 +95,7 @@
this.active = active;
}
- long getNextFreePageId() {
- return this.nextFreePageId;
- }
-
- void setNextFreePageId(long nextPageId) {
- this.nextFreePageId = nextPageId;
- }
-
+
long getId() {
return id;
}
@@ -116,8 +109,9 @@
}
void write(Marshaller keyMarshaller, DataOutput dataOut) throws
IOException {
+ persistedSize=hashIndexEntries.size();
writeHeader(dataOut);
- dataOut.writeInt(hashIndexEntries.size());
+ dataOut.writeInt(persistedSize);
for (HashEntry entry : hashIndexEntries) {
entry.write(keyMarshaller, dataOut);
}
@@ -125,7 +119,8 @@
void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
readHeader(dataIn);
- int size = dataIn.readInt();
+ dataIn.readInt();
+ int size = persistedSize;
hashIndexEntries.clear();
for (int i = 0; i < size; i++) {
HashEntry entry = new HashEntry();
@@ -145,8 +140,10 @@
dataOut.writeBoolean(isActive());
dataOut.writeLong(nextFreePageId);
dataOut.writeInt(binId);
- dataOut.writeInt(size());
+ persistedSize=hashIndexEntries.size();
+ dataOut.writeInt(persistedSize);
}
+
boolean isEmpty() {
return hashIndexEntries.isEmpty();
@@ -186,12 +183,10 @@
void reset() throws IOException {
hashIndexEntries.clear();
- setNextFreePageId(HashEntry.NOT_SET);
+ persistedSize=0;
}
void addHashEntry(int index, HashEntry entry) throws IOException {
- // index = index >= 0 ? index : 0;
- // index = (index == 0 || index< size()) ? index : size()-1;
hashIndexEntries.add(index, entry);
}
@@ -227,7 +222,7 @@
this.binId = binId;
}
- void dump() {
+ String dump() {
StringBuffer str = new StringBuffer(32);
str.append(toString());
@@ -236,6 +231,6 @@
str.append(entry);
str.append(",");
}
- LOG.info(str);
+ return str.toString();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
Sun Mar 2 23:18:00 2008
@@ -86,8 +86,8 @@
return result;
}
- void dump() {
- page.dump();
+ String dump() {
+ return page.dump();
}
void begin() throws IOException {