Author: toad
Date: 2008-10-22 12:49:39 +0000 (Wed, 22 Oct 2008)
New Revision: 23033
Added:
branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java
Modified:
branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
branches/db4o/freenet/src/freenet/support/io/BucketTools.java
Log:
Segmented bucket chain bucket: stores every 128 buckets into a segment, saves
it to the database and deactivates it.
Saves lots of memory during the splitting phase.
Modified:
branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -76,7 +76,7 @@
// going by memory usage only; 4kB per stripe
static final int MAX_SPLITFILE_BLOCKS_PER_SEGMENT = 1024;
static final int MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 1536;
- static final int SPLITFILE_BLOCKS_PER_SEGMENT = 128;
+ public static final int SPLITFILE_BLOCKS_PER_SEGMENT = 128;
static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 128;
Modified: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -6,6 +6,8 @@
import com.db4o.ObjectSet;
import com.db4o.query.Query;
+import freenet.client.HighLevelSimpleClient;
+import freenet.client.HighLevelSimpleClientImpl;
import freenet.client.InsertException;
import freenet.keys.NodeCHK;
import freenet.support.Logger;
@@ -123,7 +125,7 @@
Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
Bucket result;
- result = comp.compress(origData, new
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE, persistent ?
context.jobRunner : null), origData.size());
+ result = comp.compress(origData, new
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE, persistent ?
context.jobRunner : null,
HighLevelSimpleClientImpl.SPLITFILE_BLOCKS_PER_SEGMENT), origData.size());
if(result.size() < minSize) {
bestCodec = comp;
if(bestCompressedData != null)
Added: branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,9 @@
+package freenet.support.io;
+
+import freenet.support.api.Bucket;
+
+public class BucketArrayWrapper {
+
+ public Bucket[] buckets;
+
+}
Modified:
branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -11,6 +11,7 @@
final BucketFactory factory;
final int blockSize;
final DBJobRunner runner;
+ final int segmentSize;
/**
* If you want persistent buckets which will be saved every 1000
buckets, and
@@ -20,14 +21,18 @@
* @param block_size
* @param runner
*/
- public BucketChainBucketFactory(BucketFactory bucketFactory, int
block_size, DBJobRunner runner) {
+ public BucketChainBucketFactory(BucketFactory bucketFactory, int
block_size, DBJobRunner runner, int segmentSize) {
this.factory = bucketFactory;
this.blockSize = block_size;
this.runner = runner;
+ this.segmentSize = segmentSize;
}
public Bucket makeBucket(long size) throws IOException {
- return new BucketChainBucket(blockSize, factory, runner);
+ if(runner == null)
+ return new BucketChainBucket(blockSize, factory,
runner);
+ else
+ return new SegmentedBucketChainBucket(blockSize,
factory, runner, segmentSize);
}
}
Modified: branches/db4o/freenet/src/freenet/support/io/BucketTools.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketTools.java
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/support/io/BucketTools.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -357,6 +357,17 @@
Logger.error(BucketTools.class, "Incompatible
split size splitting a BucketChainBucket: his split size is "+data.bucketSize+"
but mine is "+splitSize+" - we will copy the data, but this suggests a bug",
new Exception("debug"));
}
}
+ if(origData instanceof SegmentedBucketChainBucket) {
+ SegmentedBucketChainBucket data =
(SegmentedBucketChainBucket)origData;
+ if(data.bucketSize == splitSize) {
+ Bucket[] buckets = data.getBuckets();
+ if(freeData)
+ data.clear();
+ return buckets;
+ } else {
+ Logger.error(BucketTools.class, "Incompatible
split size splitting a BucketChainBucket: his split size is "+data.bucketSize+"
but mine is "+splitSize+" - we will copy the data, but this suggests a bug",
new Exception("debug"));
+ }
+ }
long length = origData.size();
if(length > ((long)Integer.MAX_VALUE) * splitSize)
throw new IllegalArgumentException("Way too big!:
"+length+" for "+splitSize);
Added:
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java
===================================================================
---
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java
(rev 0)
+++
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,332 @@
+package freenet.support.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
+import freenet.support.api.Bucket;
+import freenet.support.api.BucketFactory;
+
+/**
+ * Splits a large persistent file into a series of buckets, which are
collected
+ * into groups called segments to avoid huge transactions/memory usage.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public class SegmentedBucketChainBucket implements Bucket {
+
+ private final ArrayList<SegmentedChainBucketSegment> segments;
+ private boolean readOnly;
+ public final long bucketSize;
+ public final int segmentSize;
+ private long size;
+ private boolean freed;
+ final BucketFactory bf;
+ private transient DBJobRunner dbJobRunner;
+ boolean stored;
+
+ public SegmentedBucketChainBucket(int blockSize, BucketFactory factory,
+ DBJobRunner runner, int segmentSize2) {
+ bucketSize = blockSize;
+ bf = factory;
+ dbJobRunner = runner;
+ segmentSize = segmentSize2;
+ segments = new ArrayList<SegmentedChainBucketSegment>();
+ }
+
+ public Bucket createShadow() throws IOException {
+ return null;
+ }
+
+ public void free() {
+ SegmentedChainBucketSegment[] segs;
+ synchronized(this) {
+ segs = segments.toArray(new
SegmentedChainBucketSegment[segments.size()]);
+ segments.clear();
+ }
+ for(SegmentedChainBucketSegment segment : segs)
+ segment.free();
+ }
+
+ public InputStream getInputStream() throws IOException {
+ synchronized(this) {
+ if(freed) throw new IOException("Freed");
+ }
+ return new InputStream() {
+
+ int segmentNo = 0;
+ int bucketNo = 0;
+ SegmentedChainBucketSegment seg = getSegment(0);
+ Bucket[] buckets = seg == null ? null : getBuckets(seg);
+ InputStream is = buckets == null ? null :
buckets[0].getInputStream();
+ private long bucketRead = 0;
+ private boolean closed;
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ if(read(b, 0, 1) <= 0) return -1;
+ return b[0];
+ }
+
+ @Override
+ public int read(byte[] buf) throws IOException {
+ return read(buf, 0, buf.length);
+ }
+
+ @Override
+ public int read(byte[] buf, int offset, int length)
throws IOException {
+ if(closed) throw new IOException("Already
closed");
+ if(bucketRead == bucketSize || is == null) {
+ is.close();
+ buckets[bucketNo] = null;
+ bucketRead = 0;
+ bucketNo++;
+ if(bucketNo == segmentSize) {
+ bucketNo = 0;
+ segmentNo++;
+ seg = getSegment(segmentNo);
+ if(seg == null) return -1;
+ }
+ if(bucketNo >= buckets.length) {
+
synchronized(SegmentedBucketChainBucket.this) {
+ if(segmentNo >=
segments.size())
+ // No more data
+ return -1;
+ }
+ buckets = getBuckets(seg);
+ if(bucketNo >= buckets.length)
+ return -1;
+ }
+ is = buckets[bucketNo].getInputStream();
+ }
+ int r = is.read(buf, offset, length);
+ if(r > 0)
+ bucketRead += r;
+ return r;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(closed) return;
+ is.close();
+ closed = true;
+ is = null;
+ seg = null;
+ buckets = null;
+ }
+
+ };
+ }
+
+ protected synchronized SegmentedChainBucketSegment getSegment(int i) {
+ return segments.get(i);
+ }
+
+ protected Bucket[] getBuckets(final SegmentedChainBucketSegment seg) {
+ Bucket[] buckets;
+ final BucketArrayWrapper baw = new BucketArrayWrapper();
+ dbJobRunner.runBlocking(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ container.activate(seg, 1);
+ baw.buckets = seg.shallowCopyBuckets();
+ container.deactivate(seg, 1);
+ }
+
+ }, NativeThread.HIGH_PRIORITY);
+ return baw.buckets;
+ }
+
+ public String getName() {
+ return "SectoredBucketChainBucket";
+ }
+
+ public OutputStream getOutputStream() throws IOException {
+ SegmentedChainBucketSegment[] segs;
+ synchronized(this) {
+ if(readOnly) throw new IOException("Read-only");
+ if(freed) throw new IOException("Freed");
+ size = 0;
+ segs = segments.toArray(new
SegmentedChainBucketSegment[segments.size()]);
+ segments.clear();
+ }
+ for(int i=0;i<segs.length;i++) {
+ segs[i].free();
+ }
+ return new OutputStream() {
+
+ int segmentNo = 0;
+ int bucketNo = 0;
+ SegmentedChainBucketSegment seg =
makeSegment(segmentNo, null);
+ OutputStream cur = seg.makeBucketStream(bucketNo);
+ private long bucketLength;
+ private boolean closed;
+
+ @Override
+ public void write(int arg0) throws IOException {
+ write(new byte[] { (byte)arg0 });
+ }
+
+ @Override
+ public void write(byte[] buf) throws IOException {
+ write(buf, 0, buf.length);
+ }
+
+ @Override
+ public void write(byte[] buf, int offset, int length)
throws IOException {
+ boolean ro;
+ synchronized(SegmentedBucketChainBucket.this) {
+ ro = readOnly;
+ }
+ if(ro) {
+ if(!closed) close();
+ throw new IOException("Read-only");
+ }
+ if(closed) throw new IOException("Already
closed");
+ while(length > 0) {
+ if(bucketLength == bucketSize) {
+ bucketNo++;
+ cur.close();
+ if(bucketNo == segmentSize) {
+ bucketNo = 0;
+ segmentNo++;
+ seg =
makeSegment(segmentNo, seg);
+ }
+ cur =
seg.makeBucketStream(bucketNo);
+ bucketLength = 0;
+ }
+ int left =
(int)Math.min(Integer.MAX_VALUE, bucketSize - bucketLength);
+ int write = Math.min(left, length);
+ cur.write(buf, offset, write);
+ offset += write;
+ length -= write;
+ bucketLength += write;
+
synchronized(SegmentedBucketChainBucket.class) {
+ size += write;
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(closed) return;
+ cur.close();
+ closed = true;
+ cur = null;
+ final SegmentedChainBucketSegment oldSeg = seg;
+ seg = null;
+ dbJobRunner.runBlocking(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+ oldSeg.storeTo(container);
+ container.ext().store(segments,
1);
+
container.ext().store(SegmentedBucketChainBucket.this, 1);
+ container.deactivate(oldSeg, 1);
+
dbJobRunner.removeRestartJob(killMe, NativeThread.HIGH_PRIORITY, container);
+ }
+
+ }, NativeThread.HIGH_PRIORITY);
+ }
+ };
+ }
+
+ private final DBJob killMe = new
SegmentedBucketChainBucketKillJob(this);
+
+ protected synchronized SegmentedChainBucketSegment makeSegment(int
index, final SegmentedChainBucketSegment oldSeg) {
+ if(oldSeg != null) {
+ dbJobRunner.runBlocking(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ oldSeg.storeTo(container);
+ container.ext().store(segments, 1);
+
container.ext().store(SegmentedBucketChainBucket.this, 1);
+ container.deactivate(oldSeg, 1);
+ dbJobRunner.queueRestartJob(killMe,
NativeThread.HIGH_PRIORITY, container);
+ }
+
+ }, NativeThread.NORM_PRIORITY);
+ }
+ SegmentedChainBucketSegment seg = new
SegmentedChainBucketSegment(this);
+ if(segments.size() != index) throw new
IllegalArgumentException("Asked to add segment "+index+" but segments length is
"+segments.size());
+ segments.add(seg);
+ return seg;
+ }
+
+ public boolean isReadOnly() {
+ return readOnly;
+ }
+
+ public void removeFrom(ObjectContainer container) {
+ // FIXME do something
+ }
+
+ public void setReadOnly() {
+ readOnly = true;
+ }
+
+ public synchronized long size() {
+ return size;
+ }
+
+ /**
+ * Note that we don't recurse inside the segments, as it would produce
a huge
+ * transaction. So you will need to close the OutputStream to commit
the
+ * progress of writing to a file. And yes, we can't append. So you need
to
+ * write everything before storing the bucket.
+ */
+ public void storeTo(ObjectContainer container) {
+ stored = true;
+ dbJobRunner.removeRestartJob(killMe,
NativeThread.HIGH_PRIORITY, container);
+ container.ext().store(segments, 1);
+ container.ext().store(this, 1);
+ }
+
+ public Bucket[] getBuckets() {
+ final BucketArrayWrapper baw = new BucketArrayWrapper();
+ dbJobRunner.runBlocking(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ baw.buckets = getBuckets(container);
+ }
+
+ }, NativeThread.HIGH_PRIORITY);
+ return baw.buckets;
+ }
+
+ protected synchronized Bucket[] getBuckets(ObjectContainer container) {
+ int segs = segments.size();
+ if(segs == 0) return new Bucket[0];
+ SegmentedChainBucketSegment seg = segments.get(segs-1);
+ container.activate(seg, 1);
+ seg.activateBuckets(container);
+ int size = (segs - 1) * segmentSize + seg.size();
+ Bucket[] buckets = new Bucket[size];
+ Bucket[] temp = seg.shallowCopyBuckets();
+ container.deactivate(seg, 1);
+ System.arraycopy(temp, 0, buckets, (segs-1)*segmentSize,
temp.length);
+ temp = null;
+ container.deactivate(seg, 1);
+ int pos = 0;
+ for(int i=0;i<(segs-1);i++) {
+ seg = segments.get(i);
+ container.activate(seg, 1);
+ seg.activateBuckets(container);
+ temp = seg.shallowCopyBuckets();
+ container.deactivate(seg, 1);
+ System.arraycopy(temp, 0, buckets, pos, segmentSize);
+ pos += segmentSize;
+ }
+ return buckets;
+ }
+
+ public synchronized void clear() {
+ segments.clear();
+ }
+
+}
Added:
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
===================================================================
---
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
(rev 0)
+++
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,23 @@
+package freenet.support.io;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
+
+public class SegmentedBucketChainBucketKillJob implements DBJob {
+
+ final SegmentedBucketChainBucket bcb;
+
+ public SegmentedBucketChainBucketKillJob(SegmentedBucketChainBucket
bucket) {
+ bcb = bucket;
+ }
+
+ public void run(ObjectContainer container, ClientContext context) {
+ container.activate(bcb, 1);
+ if(bcb.stored) return;
+ System.err.println("Freeing unfinished unstored bucket "+this);
+ bcb.removeFrom(container);
+ }
+
+}
Added:
branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java
(rev 0)
+++
branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java
2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,60 @@
+package freenet.support.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.support.api.Bucket;
+
+public class SegmentedChainBucketSegment {
+
+ private final ArrayList<Bucket> buckets;
+ private final SegmentedBucketChainBucket bcb;
+
+ public SegmentedChainBucketSegment(SegmentedBucketChainBucket bucket) {
+ this.bcb = bucket;
+ this.buckets = new ArrayList<Bucket>();
+ }
+
+ public void free() {
+ for(Bucket bucket : buckets)
+ bucket.free();
+ }
+
+ public void storeTo(ObjectContainer container) {
+ for(Bucket bucket : buckets)
+ bucket.storeTo(container);
+ container.ext().store(buckets, 1);
+ container.ext().store(this, 1);
+ }
+
+ public synchronized Bucket[] shallowCopyBuckets() {
+ int sz = buckets.size();
+ Bucket[] out = new Bucket[sz];
+ for(int i=0;i<sz;i++) out[i] = buckets.get(i);
+ return out;
+ }
+
+ public OutputStream makeBucketStream(int bucketNo) throws IOException {
+ if(bucketNo >= bcb.segmentSize)
+ throw new IllegalArgumentException("Too many buckets in
segment");
+ Bucket b = bcb.bf.makeBucket(bcb.bucketSize);
+ synchronized(this) {
+ if(buckets.size() != bucketNo)
+ throw new IllegalArgumentException("Next bucket
should be "+buckets.size()+" but is "+bucketNo);
+ buckets.add(b);
+ }
+ return b.getOutputStream();
+ }
+
+ public int size() {
+ return buckets.size();
+ }
+
+ void activateBuckets(ObjectContainer container) {
+ container.activate(buckets, 1);
+ }
+
+}