Author: chirino
Date: Mon Sep 8 07:25:27 2008
New Revision: 693109
URL: http://svn.apache.org/viewvc?rev=693109&view=rev
Log:
Refactored the PageFile/Transaction stuff a bit to make it easier to maintian.
At this time an interface is not needed for Transaction.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=693109&r1=693108&r2=693109&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Mon Sep 8 07:25:27 2008
@@ -20,21 +20,17 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InterruptedIOException;
-import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
@@ -44,14 +40,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.Marshaller;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache;
-import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;
/**
@@ -78,32 +69,32 @@
private final String name;
private File directory;
- private RandomAccessFile readFile;
+ RandomAccessFile readFile;
private RandomAccessFile writeFile;
- private int pageSize = DEFAULT_PAGE_SIZE;
+ int pageSize = DEFAULT_PAGE_SIZE;
private int
recoveryBufferSize=(this.pageSize+RECOVERY_HEADER_SIZE)*MAX_PAGES_IN_RECOVERY_BUFFER;
private int initialPageOffset;
- private long nextFreePageId;
+ long nextFreePageId;
- private SequenceSet freeList = new SequenceSet();
- private AtomicBoolean loaded = new AtomicBoolean();
+ SequenceSet freeList = new SequenceSet();
+ AtomicBoolean loaded = new AtomicBoolean();
private LRUCache<Long, Page> pageCache;
private boolean enableRecoveryBuffer=false;
private boolean enableSyncedWrites=false;
private boolean enablePageCaching=true;
- private boolean enableAsyncWrites=false;
+ boolean enableAsyncWrites=false;
private int pageCacheSize = 100;
- private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
+ TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
private Thread writerThread;
AtomicBoolean stopWriter = new AtomicBoolean();
private CountDownLatch checkpointLatch;
- private AtomicLong nextTxid = new AtomicLong();
+ AtomicLong nextTxid = new AtomicLong();
private MetaData metaData;
/**
@@ -167,7 +158,7 @@
/**
* Internally used by the double write buffer implementation used in this
class.
*/
- private class PageWrite<T> {
+ class PageWrite<T> {
Page<T> page;
byte[] current;
byte[] diskBound;
@@ -201,567 +192,9 @@
}
- /**
- * Provides transaction update access to the PageFile. All operations
that modify
- * the PageFile are done via a Transaction.
- */
- class PageFileTransaction implements Transaction {
-
- /**
- * @see org.apache.kahadb.page.Transaction#getPageFile()
- */
- public PageFile getPageFile() {
- return PageFile.this;
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#allocate()
- */
- public <T> Page<T> allocate() throws IOException {
- return allocate(1);
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#allocate(int)
- */
- public <T> Page<T> allocate(int count) throws IOException {
- assertLoaded();
- if ( count <= 0 ) {
- throw new IllegalArgumentException("The allocation count must
be larger than zero");
- }
-
- Sequence seq = freeList.removeFirstSequence(count);
-
- // We may need to create new free pages...
- if(seq==null) {
-
- Page<T> first=null;
- int c=count;
- while( c > 0 ) {
- Page<T> page = new Page<T>(nextFreePageId ++);
- page.makeFree(nextTxid.get());
-
- if( first == null ) {
- first = page;
- }
-
- addToCache(page);
- DataByteArrayOutputStream out = new
DataByteArrayOutputStream(pageSize);
- page.write(out);
- write(page, out.getData());
-
-// LOG.debug("allocate writing: "+page.getPageId());
- c--;
- }
-
- return first;
- }
-
- Page<T> page = new Page<T>(seq.getFirst());
- page.makeFree(0);
-// LOG.debug("allocated: "+page.getPageId());
- return page;
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#free(long)
- */
- public void free(long pageId) throws IOException {
- free(load(pageId, null));
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#free(long, int)
- */
- public void free(long pageId, int count) throws IOException {
- free(load(pageId, null), count);
- }
-
- /**
- * @see
org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page, int)
- */
- public <T> void free(Page<T> page, int count) throws IOException {
- assertLoaded();
- long offsetPage=page.getPageId();
- for (int i = 0; i < count; i++) {
- if( page == null ) {
- page=load(offsetPage+i, null);
- }
- free(page);
- page=null;
- }
- }
-
- /**
- * @see
org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page)
- */
- public <T> void free(Page<T> page) throws IOException {
- assertLoaded();
-
- // We may need loop to free up a page chain.
- while(page!=null){
-
- // Is it already free??
- if( page.getType() == Page.PAGE_FREE_TYPE ) {
- return;
- }
-
- Page<T> next = null;
- if( page.getType()==Page.PAGE_PART_TYPE ) {
- next = load(page.getNext(), null);
- }
-
- page.makeFree(nextTxid.get());
-
- DataByteArrayOutputStream out = new
DataByteArrayOutputStream(pageSize);
- page.write(out);
- write(page, out.getData());
-
- removeFromCache(page);
- freeList.add(page.getPageId());
- page = next;
- }
- }
-
- /**
- * @see
org.apache.kahadb.page.Transaction#store(org.apache.kahadb.page.Page,
org.apache.kahadb.Marshaller, boolean)
- */
- public <T> void store(Page<T> page, Marshaller<T> marshaller, final
boolean overflow) throws IOException {
- DataByteArrayOutputStream out =
(DataByteArrayOutputStream)openOutputStream(page, overflow);
- if( marshaller!=null ) {
- marshaller.writePayload(page.get(), out);
- }
- out.close();
- }
-
- /**
- * @throws IOException
- */
- public OutputStream openOutputStream(Page page, final boolean
overflow) throws IOException {
- assertLoaded();
-
- // Copy to protect against the end user changing
- // the page instance while we are doing a write.
- final Page copy = page.copy();
- addToCache(copy);
-
- //
- // To support writing VERY large data, we override the output
stream so that we
- // we do the page writes incrementally while the data is being
marshalled.
- DataByteArrayOutputStream out = new
DataByteArrayOutputStream(pageSize*2) {
- Page current = copy;
-
- @SuppressWarnings("unchecked")
- @Override
- protected void onWrite() throws IOException {
-
- // Are we at an overflow condition?
- if( pos >= pageSize ) {
- // If overflow is allowed
- if( overflow ) {
-
- Page next;
- if( current.getType() == Page.PAGE_PART_TYPE ) {
- next = load(current.getNext(), null);
- } else {
- next = allocate();
- }
-
- next.txId = current.txId;
-
- // Write the page header
- int oldPos = pos;
- pos = 0;
-
- current.makePagePart(next.getPageId(),
nextTxid.get());
- current.write(this);
-
- // Do the page write..
- byte [] data = new byte[pageSize];
- System.arraycopy(buf, 0, data, 0, pageSize);
- PageFileTransaction.this.write(current, data);
-
- // Reset for the next page chunk
- pos = 0;
- // The page header marshalled after the data is
written.
- skip(Page.PAGE_HEADER_SIZE);
- // Move the overflow data after the header.
- System.arraycopy(buf, pageSize, buf, pos,
oldPos-pageSize);
- pos += oldPos-pageSize;
- current = next;
-
- } else {
- throw new PageOverflowIOException("Page
overflow.");
- }
- }
-
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void close() throws IOException {
- super.close();
-
- // We need to free up the rest of the page chain..
- if( current.getType() == Page.PAGE_PART_TYPE ) {
- free(current.getNext());
- }
-
- current.makePageEnd(pos, nextTxid.get());
-
- // Write the header..
- pos = 0;
- current.write(this);
-
- PageFileTransaction.this.write(current, buf);
- }
- };
-
- // The page header marshaled after the data is written.
- out.skip(Page.PAGE_HEADER_SIZE);
- return out;
- }
-
- /**
- * @param page
- * @param data
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- private <T> void write(final Page<T> page, byte[] data) throws
IOException {
- Long key = page.getPageId();
- synchronized( writes ) {
- // If it's not in the write cache...
- PageWrite<T> write = writes.get(key);
- if( write==null ) {
- write = new PageWrite<T>(page, data);
- writes.put(key, write);
- } else {
- write.setCurrent(page, data);
- }
-
- // Once we start approaching capacity, notify the writer to
start writing
- if( canStartWriteBatch() ) {
- if( enableAsyncWrites ) {
- writes.notify();
- } else {
- while( canStartWriteBatch() ) {
- writeBatch(-1, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#load(long,
org.apache.kahadb.Marshaller)
- */
- public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws
IOException {
- assertLoaded();
- Page<T> page = new Page<T>(pageId);
- load(page, marshaller);
- return page;
- }
-
- /**
- * @see
org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
org.apache.kahadb.Marshaller)
- */
- public <T> void load(Page<T> page, Marshaller<T> marshaller) throws
IOException {
- assertLoaded();
-
- // Can't load invalid offsets...
- if (page.getPageId() < 0) {
- throw new Transaction.InvalidPageIOException("Page id is not
valid", page.getPageId());
- }
-
- // Try to load it from the cache first...
- Page<T> t = getFromCache(page.getPageId());
- if (t != null) {
- page.copy(t);
- return;
- }
-
- if( marshaller!=null ) {
- // Full page read..
- InputStream is = openInputStream(page);
- DataInputStream dataIn = new DataInputStream(is);
- page.set(marshaller.readPayload(dataIn));
- is.close();
- } else {
- // Page header read.
- DataByteArrayInputStream in = new DataByteArrayInputStream(new
byte[Page.PAGE_HEADER_SIZE]);
- readFile.seek(toOffset(page.getPageId()));
- readFile.readFully(in.getRawData(), 0, Page.PAGE_HEADER_SIZE);
- page.read(in);
- page.set(null);
- }
-
- // Cache it.
- if( marshaller!=null ) {
- addToCache(page);
- }
- }
-
- /**
- * @see
org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
org.apache.kahadb.Marshaller)
- */
- public InputStream openInputStream(final Page p) throws IOException {
-
- return new InputStream() {
-
- private ByteSequence chunk = new ByteSequence(new
byte[pageSize]);
- private Page page = readPage(p);
- private int pageCount=1;
-
- private Page markPage;
- private ByteSequence markChunk;
-
- private Page readPage(Page page) throws IOException {
- // Read the page data
- readFile.seek(toOffset(page.getPageId()));
- readFile.readFully(chunk.getData(), 0, pageSize);
- chunk.setOffset(0);
- chunk.setLength(pageSize);
-
- DataByteArrayInputStream in = new
DataByteArrayInputStream(chunk);
- page.read(in);
-
- chunk.setOffset(Page.PAGE_HEADER_SIZE);
- if( page.getType() == Page.PAGE_END_TYPE ) {
- chunk.setLength((int)(page.getNext()));
- }
-
- if( page.getType() == Page.PAGE_FREE_TYPE ) {
- throw new EOFException("Chunk stream does not exist at
page: "+page.getPageId());
- }
-
- return page;
- }
-
- public int read() throws IOException {
- if (!atEOF()) {
- return chunk.data[chunk.offset++] & 0xff;
- } else {
- return -1;
- }
- }
-
- private boolean atEOF() throws IOException {
- if( chunk.offset < chunk.length ) {
- return false;
- }
- if( page.getType() == Page.PAGE_END_TYPE ) {
- return true;
- }
- fill();
- return chunk.offset >= chunk.length;
- }
-
- private void fill() throws IOException {
- page = readPage(new Page(page.getNext()));
- pageCount++;
- }
-
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- public int read(byte b[], int off, int len) throws IOException
{
- if (!atEOF()) {
- int rc=0;
- while(!atEOF() && rc < len) {
- len = Math.min(len, chunk.length - chunk.offset);
- if (len > 0) {
- System.arraycopy(chunk.data, chunk.offset, b,
off, len);
- chunk.offset += len;
- }
- rc+=len;
- }
- return rc;
- } else {
- return -1;
- }
- }
-
- public long skip(long len) throws IOException {
- if (atEOF()) {
- int rc=0;
- while(!atEOF() && rc < len) {
- len = Math.min(len, chunk.length - chunk.offset);
- if (len > 0) {
- chunk.offset += len;
- }
- rc+=len;
- }
- return rc;
- } else {
- return -1;
- }
- }
-
- public int available() {
- return chunk.length - chunk.offset;
- }
-
- public boolean markSupported() {
- return true;
- }
-
- public void mark(int markpos) {
- markPage = page;
- byte data[] = new byte[pageSize];
- System.arraycopy(chunk.getData(), 0, data, 0, pageSize);
- markChunk = new ByteSequence(data, chunk.getOffset(),
chunk.getLength());
- }
-
- public void reset() {
- page = markPage;
- chunk = markChunk;
- }
-
- };
- }
-
-
- /**
- * @see org.apache.kahadb.page.Transaction#iterator()
- */
- @SuppressWarnings("unchecked")
- public Iterator<Page> iterator() {
- return (Iterator<Page>)iterator(false);
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#iterator(boolean)
- */
- public Iterator<Page> iterator(final boolean includeFreePages) {
-
- assertLoaded();
-
- return new Iterator<Page>() {
- long nextId;
- Page nextPage;
- Page lastPage;
-
- private void findNextPage() {
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot iterate the
pages when the page file is not loaded");
- }
-
- if( nextPage!=null ) {
- return;
- }
-
- try {
- while( nextId < PageFile.this.nextFreePageId ) {
-
- Page page = load(nextId, null);
-
- if( includeFreePages ||
page.getType()!=Page.PAGE_FREE_TYPE ) {
- nextPage = page;
- return;
- } else {
- nextId++;
- }
- }
- } catch (IOException e) {
- }
- }
-
- public boolean hasNext() {
- findNextPage();
- return nextPage !=null;
- }
-
- public Page next() {
- findNextPage();
- if( nextPage !=null ) {
- lastPage = nextPage;
- nextPage=null;
- nextId++;
- return lastPage;
- } else {
- throw new NoSuchElementException();
- }
- }
-
- public void remove() {
- if( lastPage==null ) {
- throw new IllegalStateException();
- }
- try {
- free(lastPage);
- lastPage=null;
- } catch (IOException e) {
- new RuntimeException(e);
- }
- }
- };
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#commit()
- */
- public void commit() throws IOException {
- }
-
- /**
- * Rolls back the transaction.
- */
- private void rollback() throws IOException {
- }
-
- /**
- * @see
org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.Closure)
- */
- public <T extends Throwable> void execute(Closure<T> closure) throws
T, IOException {
- boolean success=false;
- try {
- closure.execute(this);
- success=true;
- } finally {
- if( success ) {
- commit();
- } else {
- rollback();
- }
- }
- }
-
- /**
- * @see
org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.CallableClosure)
- */
- public <R, T extends Throwable> R execute(CallableClosure<R, T>
closure) throws T, IOException {
- boolean success=false;
- try {
- R rc = closure.execute(this);
- success=true;
- return rc;
- } finally {
- if( success ) {
- commit();
- } else {
- rollback();
- }
- }
- }
-
- /**
- * @see org.apache.kahadb.page.Transaction#isReadOnly()
- */
- public boolean isReadOnly() {
- return false;
- }
-
- public long getPageCount() {
- return nextFreePageId;
- }
-
- }
-
public Transaction tx() {
assertLoaded();
- return new PageFileTransaction();
+ return new Transaction(this);
}
/**
@@ -977,7 +410,7 @@
// Internal Double write implementation follows...
///////////////////////////////////////////////////////////////////
- private boolean canStartWriteBatch() {
+ boolean canStartWriteBatch() {
int capacityUsed = ((writes.size() *
100)/MAX_PAGES_IN_RECOVERY_BUFFER);
if( enableAsyncWrites ) {
@@ -999,7 +432,7 @@
* @throws InterruptedException
* @throws IOException
*/
- private boolean writeBatch(long timeout, TimeUnit unit) throws IOException
{
+ boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
int batchLength=8+4; // Account for the: lastTxid + recovery record
counter
ArrayList<PageWrite> batch = new
ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
@@ -1212,14 +645,14 @@
return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
}
- private long toOffset(long pageId) {
+ long toOffset(long pageId) {
return initialPageOffset+(pageId*pageSize);
}
/**
* @throws IllegalStateException if the page file is not loaded.
*/
- private void assertLoaded() throws IllegalStateException {
+ void assertLoaded() throws IllegalStateException {
if( !loaded.get() ) {
throw new IllegalStateException("PageFile is not loaded");
}
@@ -1229,8 +662,7 @@
// Internal Cache Related operations
///////////////////////////////////////////////////////////////////
- @SuppressWarnings("unchecked")
- private <T> Page<T> getFromCache(long pageId) {
+ @SuppressWarnings("unchecked") <T> Page<T> getFromCache(long pageId) {
synchronized(writes) {
PageWrite<T> pageWrite = writes.get(pageId);
if( pageWrite != null ) {
@@ -1245,13 +677,13 @@
return result;
}
- private void addToCache(Page page) {
+ void addToCache(Page page) {
if (enablePageCaching) {
pageCache.put(page.getPageId(), page);
}
}
- private void removeFromCache(Page page) {
+ void removeFromCache(Page page) {
if (enablePageCaching) {
pageCache.remove(page.getPageId());
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=693109&r1=693108&r2=693109&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
Mon Sep 8 07:25:27 2008
@@ -16,18 +16,27 @@
*/
package org.apache.kahadb.page;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.page.PageFile.PageWrite;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.Sequence;
/**
- * The interface used to read/update a PageFile object. Using a transaction
allows you to
+ * The class used to read/update a PageFile object. Using a transaction
allows you to
* do multiple update operations in a single unit of work.
*/
-public interface Transaction extends Iterable<Page> {
+public class Transaction implements Iterable<Page> {
/**
*
@@ -70,7 +79,22 @@
public R execute(Transaction tx) throws T;
}
- public PageFile getPageFile();
+
+ private final PageFile pageFile;
+
+ /**
+ * @param pageFile
+ */
+ Transaction(PageFile pageFile) {
+ this.pageFile = pageFile;
+ }
+
+ /**
+ * @see org.apache.kahadb.page.Transaction#getPageFile()
+ */
+ public PageFile getPageFile() {
+ return this.pageFile;
+ }
/**
* Allocates a free page that you can write data to.
@@ -81,7 +105,9 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> Page<T> allocate() throws IOException;
+ public <T> Page<T> allocate() throws IOException {
+ return allocate(1);
+ }
/**
* Allocates a block of free pages that you can write data to.
@@ -93,7 +119,44 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> Page<T> allocate(int count) throws IOException;
+ public <T> Page<T> allocate(int count) throws IOException {
+ this.pageFile.assertLoaded();
+ if (count <= 0) {
+ throw new IllegalArgumentException("The allocation count must be
larger than zero");
+ }
+
+ Sequence seq = this.pageFile.freeList.removeFirstSequence(count);
+
+ // We may need to create new free pages...
+ if (seq == null) {
+
+ Page<T> first = null;
+ int c = count;
+ while (c > 0) {
+ Page<T> page = new Page<T>(this.pageFile.nextFreePageId++);
+ page.makeFree(this.pageFile.nextTxid.get());
+
+ if (first == null) {
+ first = page;
+ }
+
+ this.pageFile.addToCache(page);
+ DataByteArrayOutputStream out = new
DataByteArrayOutputStream(this.pageFile.pageSize);
+ page.write(out);
+ write(page, out.getData());
+
+ // LOG.debug("allocate writing: "+page.getPageId());
+ c--;
+ }
+
+ return first;
+ }
+
+ Page<T> page = new Page<T>(seq.getFirst());
+ page.makeFree(0);
+ // LOG.debug("allocated: "+page.getPageId());
+ return page;
+ }
/**
* Frees up a previously allocated page so that it can be re-allocated
again.
@@ -104,18 +167,24 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> void free(Page<T> page) throws IOException;
+ public void free(long pageId) throws IOException {
+ free(load(pageId, null));
+ }
/**
- * Frees up a previously allocated page so that it can be re-allocated
again.
+ * Frees up a previously allocated sequence of pages so that it can be
re-allocated again.
+ *
+ * @param page the initial page of the sequence that will be getting freed
+ * @param count the number of pages in the sequence
*
- * @param page the page to free up
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public void free(long pageId) throws IOException;
+ public void free(long pageId, int count) throws IOException {
+ free(load(pageId, null), count);
+ }
/**
* Frees up a previously allocated sequence of pages so that it can be
re-allocated again.
@@ -128,20 +197,54 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> void free(Page<T> page, int count) throws IOException;
+ public <T> void free(Page<T> page, int count) throws IOException {
+ this.pageFile.assertLoaded();
+ long offsetPage = page.getPageId();
+ for (int i = 0; i < count; i++) {
+ if (page == null) {
+ page = load(offsetPage + i, null);
+ }
+ free(page);
+ page = null;
+ }
+ }
/**
- * Frees up a previously allocated sequence of pages so that it can be
re-allocated again.
- *
- * @param page the initial page of the sequence that will be getting freed
- * @param count the number of pages in the sequence
+ * Frees up a previously allocated page so that it can be re-allocated
again.
*
+ * @param page the page to free up
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public void free(long pageId, int count) throws IOException;
+ public <T> void free(Page<T> page) throws IOException {
+ this.pageFile.assertLoaded();
+
+ // We may need loop to free up a page chain.
+ while (page != null) {
+
+ // Is it already free??
+ if (page.getType() == Page.PAGE_FREE_TYPE) {
+ return;
+ }
+
+ Page<T> next = null;
+ if (page.getType() == Page.PAGE_PART_TYPE) {
+ next = load(page.getNext(), null);
+ }
+
+ page.makeFree(this.pageFile.nextTxid.get());
+
+ DataByteArrayOutputStream out = new
DataByteArrayOutputStream(this.pageFile.pageSize);
+ page.write(out);
+ write(page, out.getData());
+
+ this.pageFile.removeFromCache(page);
+ this.pageFile.freeList.add(page.getPageId());
+ page = next;
+ }
+ }
/**
*
@@ -160,7 +263,135 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> void store(Page<T> page, Marshaller<T> marshaller, boolean
overflow) throws IOException, PageOverflowIOException;
+ public <T> void store(Page<T> page, Marshaller<T> marshaller, final
boolean overflow) throws IOException {
+ DataByteArrayOutputStream out =
(DataByteArrayOutputStream)openOutputStream(page, overflow);
+ if (marshaller != null) {
+ marshaller.writePayload(page.get(), out);
+ }
+ out.close();
+ }
+
+ /**
+ * @throws IOException
+ */
+ public OutputStream openOutputStream(Page page, final boolean overflow)
throws IOException {
+ this.pageFile.assertLoaded();
+
+ // Copy to protect against the end user changing
+ // the page instance while we are doing a write.
+ final Page copy = page.copy();
+ this.pageFile.addToCache(copy);
+
+ //
+ // To support writing VERY large data, we override the output stream so
+ // that we
+ // we do the page writes incrementally while the data is being
+ // marshalled.
+ DataByteArrayOutputStream out = new
DataByteArrayOutputStream(this.pageFile.pageSize * 2) {
+ Page current = copy;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void onWrite() throws IOException {
+
+ // Are we at an overflow condition?
+ if (pos >= Transaction.this.pageFile.pageSize) {
+ // If overflow is allowed
+ if (overflow) {
+
+ Page next;
+ if (current.getType() == Page.PAGE_PART_TYPE) {
+ next = load(current.getNext(), null);
+ } else {
+ next = allocate();
+ }
+
+ next.txId = current.txId;
+
+ // Write the page header
+ int oldPos = pos;
+ pos = 0;
+
+ current.makePagePart(next.getPageId(),
Transaction.this.pageFile.nextTxid.get());
+ current.write(this);
+
+ // Do the page write..
+ byte[] data = new
byte[Transaction.this.pageFile.pageSize];
+ System.arraycopy(buf, 0, data, 0,
Transaction.this.pageFile.pageSize);
+ Transaction.this.write(current, data);
+
+ // Reset for the next page chunk
+ pos = 0;
+ // The page header marshalled after the data is
written.
+ skip(Page.PAGE_HEADER_SIZE);
+ // Move the overflow data after the header.
+ System.arraycopy(buf,
Transaction.this.pageFile.pageSize, buf, pos, oldPos -
Transaction.this.pageFile.pageSize);
+ pos += oldPos - Transaction.this.pageFile.pageSize;
+ current = next;
+
+ } else {
+ throw new PageOverflowIOException("Page overflow.");
+ }
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ // We need to free up the rest of the page chain..
+ if (current.getType() == Page.PAGE_PART_TYPE) {
+ free(current.getNext());
+ }
+
+ current.makePageEnd(pos,
Transaction.this.pageFile.nextTxid.get());
+
+ // Write the header..
+ pos = 0;
+ current.write(this);
+
+ Transaction.this.write(current, buf);
+ }
+ };
+
+ // The page header marshaled after the data is written.
+ out.skip(Page.PAGE_HEADER_SIZE);
+ return out;
+ }
+
+ /**
+ * @param page
+ * @param data
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ private <T> void write(final Page<T> page, byte[] data) throws IOException
{
+ Long key = page.getPageId();
+ synchronized (this.pageFile.writes) {
+ // If it's not in the write cache...
+ PageWrite<T> write = this.pageFile.writes.get(key);
+ if (write == null) {
+ write = this.pageFile.new PageWrite<T>(page, data);
+ this.pageFile.writes.put(key, write);
+ } else {
+ write.setCurrent(page, data);
+ }
+
+ // Once we start approaching capacity, notify the writer to start
+ // writing
+ if (this.pageFile.canStartWriteBatch()) {
+ if (this.pageFile.enableAsyncWrites) {
+ this.pageFile.writes.notify();
+ } else {
+ while (this.pageFile.canStartWriteBatch()) {
+ this.pageFile.writeBatch(-1, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+ }
/**
* Loads a page from disk.
@@ -175,7 +406,12 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws
IOException;
+ public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws
IOException {
+ this.pageFile.assertLoaded();
+ Page<T> page = new Page<T>(pageId);
+ load(page, marshaller);
+ return page;
+ }
/**
* Loads a page from disk. If the page.pageId is not valid then then this
method will set the page.type to
@@ -189,31 +425,163 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> void load(Page<T> page, Marshaller<T> marshaller) throws
IOException;
+ public <T> void load(Page<T> page, Marshaller<T> marshaller) throws
IOException {
+ this.pageFile.assertLoaded();
+
+ // Can't load invalid offsets...
+ if (page.getPageId() < 0) {
+ throw new InvalidPageIOException("Page id is not valid",
page.getPageId());
+ }
+
+ // Try to load it from the cache first...
+ Page<T> t = this.pageFile.getFromCache(page.getPageId());
+ if (t != null) {
+ page.copy(t);
+ return;
+ }
+
+ if (marshaller != null) {
+ // Full page read..
+ InputStream is = openInputStream(page);
+ DataInputStream dataIn = new DataInputStream(is);
+ page.set(marshaller.readPayload(dataIn));
+ is.close();
+ } else {
+ // Page header read.
+ DataByteArrayInputStream in = new DataByteArrayInputStream(new
byte[Page.PAGE_HEADER_SIZE]);
+
this.pageFile.readFile.seek(this.pageFile.toOffset(page.getPageId()));
+ this.pageFile.readFile.readFully(in.getRawData(), 0,
Page.PAGE_HEADER_SIZE);
+ page.read(in);
+ page.set(null);
+ }
+
+ // Cache it.
+ if (marshaller != null) {
+ this.pageFile.addToCache(page);
+ }
+ }
-
- /**
- *
- * @param page
- * @param overflow
- * @return
- * @throws IOException
- */
- public OutputStream openOutputStream(Page page, final boolean overflow)
throws IOException;
-
- /**
- *
- * @param p
- * @return
- * @throws IOException
- */
- public InputStream openInputStream(final Page p) throws IOException;
-
/**
- * @return the number of pages allocated in the PageFile
+ * @see
org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
+ * org.apache.kahadb.Marshaller)
*/
- public long getPageCount();
-
+ public InputStream openInputStream(final Page p) throws IOException {
+
+ return new InputStream() {
+
+ private ByteSequence chunk = new ByteSequence(new
byte[Transaction.this.pageFile.pageSize]);
+ private Page page = readPage(p);
+ private int pageCount = 1;
+
+ private Page markPage;
+ private ByteSequence markChunk;
+
+ private Page readPage(Page page) throws IOException {
+ // Read the page data
+
Transaction.this.pageFile.readFile.seek(Transaction.this.pageFile.toOffset(page.getPageId()));
+ Transaction.this.pageFile.readFile.readFully(chunk.getData(),
0, Transaction.this.pageFile.pageSize);
+ chunk.setOffset(0);
+ chunk.setLength(Transaction.this.pageFile.pageSize);
+
+ DataByteArrayInputStream in = new
DataByteArrayInputStream(chunk);
+ page.read(in);
+
+ chunk.setOffset(Page.PAGE_HEADER_SIZE);
+ if (page.getType() == Page.PAGE_END_TYPE) {
+ chunk.setLength((int)(page.getNext()));
+ }
+
+ if (page.getType() == Page.PAGE_FREE_TYPE) {
+ throw new EOFException("Chunk stream does not exist at
page: " + page.getPageId());
+ }
+
+ return page;
+ }
+
+ public int read() throws IOException {
+ if (!atEOF()) {
+ return chunk.data[chunk.offset++] & 0xff;
+ } else {
+ return -1;
+ }
+ }
+
+ private boolean atEOF() throws IOException {
+ if (chunk.offset < chunk.length) {
+ return false;
+ }
+ if (page.getType() == Page.PAGE_END_TYPE) {
+ return true;
+ }
+ fill();
+ return chunk.offset >= chunk.length;
+ }
+
+ private void fill() throws IOException {
+ page = readPage(new Page(page.getNext()));
+ pageCount++;
+ }
+
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ public int read(byte b[], int off, int len) throws IOException {
+ if (!atEOF()) {
+ int rc = 0;
+ while (!atEOF() && rc < len) {
+ len = Math.min(len, chunk.length - chunk.offset);
+ if (len > 0) {
+ System.arraycopy(chunk.data, chunk.offset, b, off,
len);
+ chunk.offset += len;
+ }
+ rc += len;
+ }
+ return rc;
+ } else {
+ return -1;
+ }
+ }
+
+ public long skip(long len) throws IOException {
+ if (atEOF()) {
+ int rc = 0;
+ while (!atEOF() && rc < len) {
+ len = Math.min(len, chunk.length - chunk.offset);
+ if (len > 0) {
+ chunk.offset += len;
+ }
+ rc += len;
+ }
+ return rc;
+ } else {
+ return -1;
+ }
+ }
+
+ public int available() {
+ return chunk.length - chunk.offset;
+ }
+
+ public boolean markSupported() {
+ return true;
+ }
+
+ public void mark(int markpos) {
+ markPage = page;
+ byte data[] = new byte[Transaction.this.pageFile.pageSize];
+ System.arraycopy(chunk.getData(), 0, data, 0,
Transaction.this.pageFile.pageSize);
+ markChunk = new ByteSequence(data, chunk.getOffset(),
chunk.getLength());
+ }
+
+ public void reset() {
+ page = markPage;
+ chunk = markChunk;
+ }
+
+ };
+ }
+
/**
* Allows you to iterate through all active Pages in this object. Pages
with type Page.FREE_TYPE are
* not included in this iteration.
@@ -223,7 +591,10 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public Iterator<Page> iterator();
+ @SuppressWarnings("unchecked")
+ public Iterator<Page> iterator() {
+ return (Iterator<Page>)iterator(false);
+ }
/**
* Allows you to iterate through all active Pages in this object. You can
optionally include free pages in the pages
@@ -234,13 +605,83 @@
* @throws IllegalStateException
* if the PageFile is not loaded
*/
- public <T> Iterator<Page<T>> iterator(final boolean includeFreePages);
+ public Iterator<Page> iterator(final boolean includeFreePages) {
+
+ this.pageFile.assertLoaded();
+
+ return new Iterator<Page>() {
+ long nextId;
+ Page nextPage;
+ Page lastPage;
+
+ private void findNextPage() {
+ if (!Transaction.this.pageFile.loaded.get()) {
+ throw new IllegalStateException("Cannot iterate the pages
when the page file is not loaded");
+ }
+
+ if (nextPage != null) {
+ return;
+ }
+
+ try {
+ while (nextId < Transaction.this.pageFile.nextFreePageId) {
+
+ Page page = load(nextId, null);
+
+ if (includeFreePages || page.getType() !=
Page.PAGE_FREE_TYPE) {
+ nextPage = page;
+ return;
+ } else {
+ nextId++;
+ }
+ }
+ } catch (IOException e) {
+ }
+ }
+
+ public boolean hasNext() {
+ findNextPage();
+ return nextPage != null;
+ }
+
+ public Page next() {
+ findNextPage();
+ if (nextPage != null) {
+ lastPage = nextPage;
+ nextPage = null;
+ nextId++;
+ return lastPage;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ public void remove() {
+ if (lastPage == null) {
+ throw new IllegalStateException();
+ }
+ try {
+ free(lastPage);
+ lastPage = null;
+ } catch (IOException e) {
+ new RuntimeException(e);
+ }
+ }
+ };
+ }
/**
* Commits the transaction to the PageFile as a single 'Unit of Work'.
Either all page updates associated
* with the transaction are written to disk or none will.
*/
- public void commit() throws IOException;
+ public void commit() throws IOException {
+ }
+
+ /**
+ * Rolls back the transaction.
+ */
+ private void rollback() throws IOException {
+ }
/**
* Executes a closure and if it does not throw any exceptions, then it
commits the transaction.
@@ -251,7 +692,19 @@
* @throws T if the closure throws it
* @throws IOException If the commit fails.
*/
- public <T extends Throwable> void execute(Closure<T> closure) throws T,
IOException;
+ public <T extends Throwable> void execute(Closure<T> closure) throws T,
IOException {
+ boolean success = false;
+ try {
+ closure.execute(this);
+ success = true;
+ } finally {
+ if (success) {
+ commit();
+ } else {
+ rollback();
+ }
+ }
+ }
/**
* Executes a closure and if it does not throw any exceptions, then it
commits the transaction.
@@ -262,12 +715,33 @@
* @throws T if the closure throws it
* @throws IOException If the commit fails.
*/
- public <R, T extends Throwable> R execute(CallableClosure<R, T> closure)
throws T, IOException;
+ public <R, T extends Throwable> R execute(CallableClosure<R, T> closure)
throws T, IOException {
+ boolean success = false;
+ try {
+ R rc = closure.execute(this);
+ success = true;
+ return rc;
+ } finally {
+ if (success) {
+ commit();
+ } else {
+ rollback();
+ }
+ }
+ }
/**
- *
* @return true if there are no uncommitted page file updates associated
with this transaction.
*/
- public boolean isReadOnly();
+ public boolean isReadOnly() {
+ return false;
+ }
+
+ /**
+ * @return the number of pages allocated in the PageFile
+ */
+ public long getPageCount() {
+ return this.pageFile.nextFreePageId;
+ }
}