Author: chirino
Date: Wed Sep 17 06:59:36 2008
New Revision: 696308
URL: http://svn.apache.org/viewvc?rev=696308&view=rev
Log:
- Checking the amount of disk space used is now much faster and accurate.
- removed ref counting of journal files since that was un-used.
- Fixed NPE that would occur in the BTree when the getFirst/getLast was used
and tree depth was > 1
- We now checkpoint and clean up at two different intervals. We now checkpoint
a much higher rate so that recover is much faster.
- Made recovery error handling more robust.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
Wed Sep 17 06:59:36 2008
@@ -541,7 +541,7 @@
node = node.getChild(tx, 0);
}
if( node.values.length>0 ) {
- return new KeyValueEntry(keys[0], values[0]);
+ return new KeyValueEntry(node.keys[0], node.values[0]);
} else {
return null;
}
@@ -553,8 +553,8 @@
node = node.getChild(tx, children.length-1);
}
if( node.values.length>0 ) {
- int idx = values.length-1;
- return new KeyValueEntry(keys[idx], values[idx]);
+ int idx = node.values.length-1;
+ return new KeyValueEntry(node.keys[idx], node.values[idx]);
} else {
return null;
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
Wed Sep 17 06:59:36 2008
@@ -33,14 +33,10 @@
protected final File file;
protected final Integer dataFileId;
- protected final int preferedSize;
-
protected int length;
- protected int referenceCount;
DataFile(File file, int number, int preferedSize) {
this.file = file;
- this.preferedSize = preferedSize;
this.dataFileId = Integer.valueOf(number);
length = (int)(file.exists() ? file.length() : 0);
}
@@ -65,44 +61,15 @@
length += size;
}
- public synchronized int increment() {
- return ++referenceCount;
- }
-
- public synchronized int decrement() {
- return --referenceCount;
- }
-
- public synchronized int getReferenceCount(){
- return referenceCount;
- }
-
- public synchronized boolean isUnused() {
- return referenceCount <= 0;
- }
-
public synchronized String toString() {
- String result = file.getName() + " number = " + dataFileId + " ,
length = " + length + " refCount = " + referenceCount;
- return result;
+ return file.getName() + " number = " + dataFileId + " , length = " +
length;
}
- public synchronized RandomAccessFile openRandomAccessFile(boolean
appender) throws IOException {
- RandomAccessFile rc = new RandomAccessFile(file, "rw");
- // When we start to write files size them up so that the OS has a
chance
- // to allocate the file contigously.
- if (appender) {
- if (length < preferedSize) {
- rc.setLength(preferedSize);
- }
- }
- return rc;
+ public synchronized RandomAccessFile openRandomAccessFile() throws
IOException {
+ return new RandomAccessFile(file, "rw");
}
public synchronized void closeRandomAccessFile(RandomAccessFile file)
throws IOException {
- // On close set the file size to the real size.
- if (length != file.length()) {
- file.setLength(getLength());
- }
file.close();
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
Wed Sep 17 06:59:36 2008
@@ -46,7 +46,7 @@
public DataFileAccessor(Journal dataManager, DataFile dataFile) throws
IOException {
this.dataFile = dataFile;
this.inflightWrites = dataManager.getInflightWrites();
- this.file = dataFile.openRandomAccessFile(false);
+ this.file = dataFile.openRandomAccessFile();
}
public DataFile getDataFile() {
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Wed Sep 17 06:59:36 2008
@@ -326,10 +326,14 @@
WriteBatch wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
+ file.setLength(dataFile.getLength());
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
- file = dataFile.openRandomAccessFile(true);
+ file = dataFile.openRandomAccessFile();
+ if( file.length() < dataManager.preferedFileLength ) {
+ file.setLength(dataManager.preferedFileLength);
+ }
}
WriteCommand write = wb.writes.getHead();
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Wed Sep 17 06:59:36 2008
@@ -101,17 +101,9 @@
protected Location mark;
protected final AtomicReference<Location> lastAppendLocation = new
AtomicReference<Location>();
protected Runnable cleanupTask;
- protected final AtomicLong storeSize;
+ protected final AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
- public Journal(AtomicLong storeSize) {
- this.storeSize = storeSize;
- }
-
- public Journal() {
- this(new AtomicLong());
- }
-
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if (started) {
@@ -147,7 +139,7 @@
int num = Integer.parseInt(numStr);
DataFile dataFile = new DataFile(file, num,
preferedFileLength);
fileMap.put(dataFile.getDataFileId(), dataFile);
- storeSize.addAndGet(dataFile.getLength());
+ totalLength.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
// Ignore file that do not match the pattern.
}
@@ -270,32 +262,19 @@
File file = new File(directory, fileName);
DataFile nextWriteFile = new DataFile(file, nextNum,
preferedFileLength);
// actually allocate the disk space
-
nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
-
- DataFile previous = dataFiles.getTail().getPrevious();
- if (previous!=null && previous.isUnused()) {
- removeDataFile(previous);
- }
}
DataFile currentWriteFile = dataFiles.getTail();
location.setOffset(currentWriteFile.getLength());
location.setDataFileId(currentWriteFile.getDataFileId().intValue());
int size = location.getSize();
currentWriteFile.incrementLength(size);
- currentWriteFile.increment();
- storeSize.addAndGet(size);
+ totalLength.addAndGet(size);
return currentWriteFile;
}
- public synchronized void removeLocation(Location location) throws
IOException {
-
- DataFile dataFile = getDataFile(location);
- dataFile.decrement();
- }
-
synchronized DataFile getDataFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
DataFile dataFile = fileMap.get(key);
@@ -350,7 +329,7 @@
boolean result = true;
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();)
{
DataFile dataFile = i.next();
- storeSize.addAndGet(-dataFile.getLength());
+ totalLength.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
fileMap.clear();
@@ -369,40 +348,6 @@
return result;
}
- public synchronized void addInterestInFile(int file) throws IOException {
- if (file >= 0) {
- Integer key = Integer.valueOf(file);
- DataFile dataFile = fileMap.get(key);
- if (dataFile == null) {
- throw new IOException("That data file does not exist");
- }
- addInterestInFile(dataFile);
- }
- }
-
- synchronized void addInterestInFile(DataFile dataFile) {
- if (dataFile != null) {
- dataFile.increment();
- }
- }
-
- public synchronized void removeInterestInFile(int file) throws IOException
{
- if (file >= 0) {
- Integer key = Integer.valueOf(file);
- DataFile dataFile = fileMap.get(key);
- removeInterestInFile(dataFile);
- }
-
- }
-
- synchronized void removeInterestInFile(DataFile dataFile) throws
IOException {
- if (dataFile != null) {
- if (dataFile.decrement() <= 0) {
- removeDataFile(dataFile);
- }
- }
- }
-
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse,
Integer lastFile) throws IOException {
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse);
@@ -422,33 +367,11 @@
}
}
- public synchronized void consolidateDataFiles() throws IOException {
- List<DataFile> purgeList = new ArrayList<DataFile>();
- for (DataFile dataFile : fileMap.values()) {
- if (dataFile.isUnused()) {
- purgeList.add(dataFile);
- }
- }
- for (DataFile dataFile : purgeList) {
- removeDataFile(dataFile);
- }
- }
-
- private synchronized void removeDataFile(DataFile dataFile) throws
IOException {
-
- // Make sure we don't delete too much data.
- if (dataFile == dataFiles.getTail() || mark == null ||
dataFile.getDataFileId() >= mark.getDataFileId()) {
- LOG.debug("Won't remove DataFile" + dataFile);
- return;
- }
- forceRemoveDataFile(dataFile);
- }
-
private synchronized void forceRemoveDataFile(DataFile dataFile) throws
IOException {
accessorPool.disposeDataFileAccessors(dataFile);
fileByFileMap.remove(dataFile.getFile());
fileMap.remove(dataFile.getDataFileId());
- storeSize.addAndGet(-dataFile.getLength());
+ totalLength.addAndGet(-dataFile.getLength());
dataFile.unlink();
if (archiveDataLogs) {
dataFile.move(getDirectoryArchive());
@@ -706,25 +629,20 @@
return fileByFileMap.keySet();
}
- synchronized public long getDiskSize() {
- long rc = 0;
- DataFile cur = dataFiles.getHead();
- while (cur != null) {
- rc += cur.getLength();
- cur = cur.getNext();
- }
- return rc;
- }
-
- synchronized public long getDiskSizeUntil(Location startPosition) {
- long rc = 0;
- DataFile cur = dataFiles.getHead();
- while (cur != null) {
- if (cur.getDataFileId().intValue() >=
startPosition.getDataFileId()) {
- return rc + startPosition.getOffset();
+ public long getDiskSize() {
+ long tailLength=0;
+ synchronized( this ) {
+ if( !dataFiles.isEmpty() ) {
+ tailLength = dataFiles.getTail().getLength();
}
- rc += cur.getLength();
- cur = cur.getNext();
+ }
+
+ long rc = totalLength.get();
+
+ // The last file is actually at a minimum preferedFileLength big.
+ if( tailLength < preferedFileLength ) {
+ rc -= tailLength;
+ rc += preferedFileLength;
}
return rc;
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
Wed Sep 17 06:59:36 2008
@@ -84,10 +84,14 @@
WriteBatch wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
+ file.setLength(dataFile.getLength());
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
- file = dataFile.openRandomAccessFile(true);
+ file = dataFile.openRandomAccessFile();
+ if( file.length() < dataManager.preferedFileLength ) {
+ file.setLength(dataManager.preferedFileLength);
+ }
channel = file.getChannel();
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
Wed Sep 17 06:59:36 2008
@@ -32,17 +32,8 @@
super(file, number, preferedSize);
}
-
public RandomAccessFile openRandomAccessFile(boolean appender) throws
IOException {
- RandomAccessFile rc = new RandomAccessFile(file, "r");
- // When we start to write files size them up so that the OS has a
chance
- // to allocate the file contigously.
- if (appender) {
- if (length < preferedSize) {
- rc.setLength(preferedSize);
- }
- }
- return rc;
+ return new RandomAccessFile(file, "r");
}
public void closeRandomAccessFile(RandomAccessFile file) throws
IOException {
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
Wed Sep 17 06:59:36 2008
@@ -67,7 +67,7 @@
int num = Integer.parseInt(numStr);
DataFile dataFile = new ReadOnlyDataFile(file, num,
preferedFileLength);
fileMap.put(dataFile.getDataFileId(), dataFile);
- storeSize.addAndGet(dataFile.getLength());
+ totalLength.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
// Ignore file that do not match the pattern.
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Wed Sep 17 06:59:36 2008
@@ -111,7 +111,7 @@
// Will we sync writes to disk. Ensures that data will not be lost after a
checkpoint()
private boolean enableSyncedWrites=true;
// Will writes be done in an async thread?
- private boolean enableAsyncWrites=true;
+ private boolean enableAsyncWrites=false;
// These are used if enableAsyncWrites==true
private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -122,7 +122,7 @@
private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
// Keeps track of free pages.
- private long nextFreePageId;
+ private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet();
private AtomicLong nextTxid = new AtomicLong();
@@ -345,7 +345,7 @@
if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
writeFile.setLength(PAGE_FILE_HEADER_SIZE);
}
- nextFreePageId=(writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize;
+
nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize);
startWriter();
} else {
@@ -662,14 +662,14 @@
}
public long getDiskSize() throws IOException {
- return readFile.length();
+ return toOffset(nextFreePageId.get());
}
/**
* @return the number of pages allocated in the PageFile
*/
public long getPageCount() {
- return nextFreePageId;
+ return nextFreePageId.get();
}
public int getRecoveryFileMinPageCount() {
@@ -732,7 +732,7 @@
Page<T> first = null;
int c = count;
while (c > 0) {
- Page<T> page = new Page<T>(nextFreePageId++);
+ Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
page.makeFree(getNextWriteTransactionId());
if (first == null) {
@@ -1039,9 +1039,10 @@
checksum.update(data, 0, pageSize);
batch.put(offset, data);
}
- } catch (IllegalStateException e) {
+ } catch (Exception e) {
// If an error occurred it was cause the redo buffer was not full
written out correctly.. so don't redo it.
// as the pages should still be consistent.
+ LOG.debug("Redo buffer was not fully intact: ", e);
return nextTxId;
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
Wed Sep 17 06:59:36 2008
@@ -460,11 +460,7 @@
return 0;
}
try {
- long diskSize;
- synchronized(indexMutex) {
- diskSize = pageFile.getDiskSize();
- }
- return asyncDataManager.getDiskSize() + diskSize;
+ return asyncDataManager.getDiskSize() + pageFile.getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Wed Sep 17 06:59:36 2008
@@ -151,7 +151,8 @@
protected boolean recovering;
protected Thread checkpointThread;
protected boolean syncWrites;
- int checkpointInterval = 30*1000;
+ int checkpointInterval = 1000;
+ int cleanupInterval = 30*1000;
protected AtomicBoolean started = new AtomicBoolean();
@@ -233,6 +234,7 @@
metadata.destinations.load(tx);
}
});
+ pageFile.flush();
// Load up all the destinations since we need to scan all the
indexes to figure out which journal files can be deleted.
// Perhaps we should just keep an index of file
@@ -256,13 +258,22 @@
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
public void run() {
try {
- long start = System.currentTimeMillis();
+ long lastCleanup = System.currentTimeMillis();
+ long lastCheckpoint = System.currentTimeMillis();
+
+ // Sleep for a short time so we can periodically check
+ // to see if we need to exit this thread.
+ long sleepTime = Math.min(checkpointInterval, 500);
while (started.get()) {
- Thread.sleep(500);
+ Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
- if( now - start >= checkpointInterval ) {
- checkpoint();
- start = now;
+ if( now - lastCleanup >= cleanupInterval ) {
+ checkpoint(true);
+ lastCleanup = now;
+ lastCheckpoint = now;
+ } else if( now - lastCheckpoint >= checkpointInterval
) {
+ checkpoint(false);
+ lastCheckpoint = now;
}
}
} catch (InterruptedException e) {
@@ -363,12 +374,12 @@
LOG.info("Replayed " + redoCounter + " operations from redo log in " +
((end - start) / 1000.0f) + " seconds.");
}
- private void checkpoint() {
+ private void checkpoint(final boolean cleanup) {
try {
synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx);
+ checkpointUpdate(tx, cleanup);
}
});
pageFile.flush();
@@ -674,58 +685,57 @@
* @param tx
* @throws IOException
*/
- private void checkpointUpdate(Transaction tx) throws IOException {
+ private void checkpointUpdate(Transaction tx, boolean cleanup) throws
IOException {
LOG.debug("Checkpoint started.");
-
- // Find empty journal files to remove.
- final HashSet<Integer> inUseFiles = new HashSet<Integer>();
-
- for (StoredDestination sd : storedDestinations.values()) {
- // Use a visitor to cut down the number of pages that we load
- sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
- int last=-1;
- public boolean isInterestedInKeysBetween(Location first,
Location second) {
- if( second!=null ) {
- if( last+1 == second.getDataFileId() ) {
- last++;
- inUseFiles.add(last);
- }
- if( last == second.getDataFileId() ) {
- return false;
- }
- }
- return true;
- }
-
- public void visit(List<Location> keys, List<Long> values) {
- for (int i = 0; i < keys.size(); i++) {
- if( last != keys.get(i).getDataFileId() ) {
- inUseFiles.add(keys.get(i).getDataFileId());
- last = keys.get(i).getDataFileId();
- }
- }
-
- }
-
- });
- }
-
metadata.state = OPEN_STATE;
metadata.firstInProgressTransactionLocation =
getFirstInProgressTxLocation();
tx.store(metadata.page, metadataMarshaller, true);
+ pageFile.flush();
- Location l = metadata.lastUpdate;
- if( metadata.firstInProgressTransactionLocation!=null ) {
- l = metadata.firstInProgressTransactionLocation;
+ if( cleanup ) {
+ // Find empty journal files to remove.
+ final HashSet<Integer> inUseFiles = new HashSet<Integer>();
+ for (StoredDestination sd : storedDestinations.values()) {
+ // Use a visitor to cut down the number of pages that we load
+ sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+ int last=-1;
+ public boolean isInterestedInKeysBetween(Location first,
Location second) {
+ if( second!=null ) {
+ if( last+1 == second.getDataFileId() ) {
+ last++;
+ inUseFiles.add(last);
+ }
+ if( last == second.getDataFileId() ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void visit(List<Location> keys, List<Long> values) {
+ for (int i = 0; i < keys.size(); i++) {
+ if( last != keys.get(i).getDataFileId() ) {
+ inUseFiles.add(keys.get(i).getDataFileId());
+ last = keys.get(i).getDataFileId();
+ }
+ }
+
+ }
+
+ });
+ }
+
+ Location l = metadata.lastUpdate;
+ if( metadata.firstInProgressTransactionLocation!=null ) {
+ l = metadata.firstInProgressTransactionLocation;
+ }
+
+ LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
+ asyncDataManager.consolidateDataFilesNotIn(inUseFiles,
l==null?null:l.getDataFileId());
}
- LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
-
- pageFile.flush();
- asyncDataManager.consolidateDataFilesNotIn(inUseFiles,
l==null?null:l.getDataFileId());
-
LOG.debug("Checkpoint done.");
}
@@ -1131,4 +1141,12 @@
this.checkpointInterval = checkpointInterval;
}
+ public int getCleanupInterval() {
+ return cleanupInterval;
+ }
+
+ public void setCleanupInterval(int cleanupInterval) {
+ this.cleanupInterval = cleanupInterval;
+ }
+
}
Modified:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
(original)
+++
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
Wed Sep 17 06:59:36 2008
@@ -29,7 +29,7 @@
protected void configureBroker(BrokerService answer,String uri) throws
Exception {
File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
dataFileDir.mkdirs();
- answer.setDeleteAllMessagesOnStartup(true);
+ // answer.setDeleteAllMessagesOnStartup(true);
KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
adaptor.setDirectory(dataFileDir);