Author: toad
Date: 2008-10-22 12:49:39 +0000 (Wed, 22 Oct 2008)
New Revision: 23033

Added:
   branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java
   branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java
   
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
   branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java
Modified:
   branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
   branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
   branches/db4o/freenet/src/freenet/support/io/BucketTools.java
Log:
Segmented bucket chain bucket: stores every 128 buckets into a segment, saves 
it to the database and deactivates it.
Saves lots of memory during the splitting phase.


Modified: 
branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2008-10-22 12:49:39 UTC (rev 23033)
@@ -76,7 +76,7 @@
        // going by memory usage only; 4kB per stripe
        static final int MAX_SPLITFILE_BLOCKS_PER_SEGMENT = 1024;
        static final int MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 1536;
-       static final int SPLITFILE_BLOCKS_PER_SEGMENT = 128;
+       public static final int SPLITFILE_BLOCKS_PER_SEGMENT = 128;
        static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 128;



Modified: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java        
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java        
2008-10-22 12:49:39 UTC (rev 23033)
@@ -6,6 +6,8 @@
 import com.db4o.ObjectSet;
 import com.db4o.query.Query;

+import freenet.client.HighLevelSimpleClient;
+import freenet.client.HighLevelSimpleClientImpl;
 import freenet.client.InsertException;
 import freenet.keys.NodeCHK;
 import freenet.support.Logger;
@@ -123,7 +125,7 @@

                                Compressor comp = 
Compressor.getCompressionAlgorithmByDifficulty(i);
                                Bucket result;
-                               result = comp.compress(origData, new 
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE, persistent ? 
context.jobRunner : null), origData.size());
+                               result = comp.compress(origData, new 
BucketChainBucketFactory(bucketFactory, NodeCHK.BLOCK_SIZE, persistent ? 
context.jobRunner : null, 
HighLevelSimpleClientImpl.SPLITFILE_BLOCKS_PER_SEGMENT), origData.size());
                                if(result.size() < minSize) {
                                        bestCodec = comp;
                                        if(bestCompressedData != null)

Added: branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java        
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/support/io/BucketArrayWrapper.java        
2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,9 @@
+package freenet.support.io;
+
+import freenet.support.api.Bucket;
+
+public class BucketArrayWrapper {
+       
+       public Bucket[] buckets;
+
+}

Modified: 
branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java  
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/support/io/BucketChainBucketFactory.java  
2008-10-22 12:49:39 UTC (rev 23033)
@@ -11,6 +11,7 @@
        final BucketFactory factory;
        final int blockSize;
        final DBJobRunner runner;
+       final int segmentSize;

        /**
         * If you want persistent buckets which will be saved every 1000 
buckets, and
@@ -20,14 +21,18 @@
         * @param block_size
         * @param runner
         */
-       public BucketChainBucketFactory(BucketFactory bucketFactory, int 
block_size, DBJobRunner runner) {
+       public BucketChainBucketFactory(BucketFactory bucketFactory, int 
block_size, DBJobRunner runner, int segmentSize) {
                this.factory = bucketFactory;
                this.blockSize = block_size;
                this.runner = runner;
+               this.segmentSize = segmentSize;
        }

        public Bucket makeBucket(long size) throws IOException {
-               return new BucketChainBucket(blockSize, factory, runner);
+               if(runner == null)
+                       return new BucketChainBucket(blockSize, factory, 
runner);
+               else
+                       return new SegmentedBucketChainBucket(blockSize, 
factory, runner, segmentSize);
        }

 }

Modified: branches/db4o/freenet/src/freenet/support/io/BucketTools.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/BucketTools.java       
2008-10-22 12:40:32 UTC (rev 23032)
+++ branches/db4o/freenet/src/freenet/support/io/BucketTools.java       
2008-10-22 12:49:39 UTC (rev 23033)
@@ -357,6 +357,17 @@
                                Logger.error(BucketTools.class, "Incompatible 
split size splitting a BucketChainBucket: his split size is "+data.bucketSize+" 
but mine is "+splitSize+" - we will copy the data, but this suggests a bug", 
new Exception("debug"));
                        }
                }
+               if(origData instanceof SegmentedBucketChainBucket) {
+                       SegmentedBucketChainBucket data = 
(SegmentedBucketChainBucket)origData;
+                       if(data.bucketSize == splitSize) {
+                               Bucket[] buckets = data.getBuckets();
+                               if(freeData)
+                                       data.clear();
+                               return buckets;
+                       } else {
+                               Logger.error(BucketTools.class, "Incompatible 
split size splitting a BucketChainBucket: his split size is "+data.bucketSize+" 
but mine is "+splitSize+" - we will copy the data, but this suggests a bug", 
new Exception("debug"));
+                       }
+               }
                long length = origData.size();
                if(length > ((long)Integer.MAX_VALUE) * splitSize)
                        throw new IllegalArgumentException("Way too big!: 
"+length+" for "+splitSize);

Added: 
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java    
                            (rev 0)
+++ 
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucket.java    
    2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,332 @@
+package freenet.support.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
+import freenet.support.api.Bucket;
+import freenet.support.api.BucketFactory;
+
+/**
+ * Splits a large persistent file into a series of buckets, which are 
collected 
+ * into groups called segments to avoid huge transactions/memory usage.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public class SegmentedBucketChainBucket implements Bucket {
+
+       private final ArrayList<SegmentedChainBucketSegment> segments;
+       private boolean readOnly;
+       public final long bucketSize;
+       public final int segmentSize;
+       private long size;
+       private boolean freed;
+       final BucketFactory bf;
+       private transient DBJobRunner dbJobRunner;
+       boolean stored;
+       
+       public SegmentedBucketChainBucket(int blockSize, BucketFactory factory, 
+                       DBJobRunner runner, int segmentSize2) {
+               bucketSize = blockSize;
+               bf = factory;
+               dbJobRunner = runner;
+               segmentSize = segmentSize2;
+               segments = new ArrayList<SegmentedChainBucketSegment>();
+       }
+
+       public Bucket createShadow() throws IOException {
+               return null;
+       }
+
+       public void free() {
+               SegmentedChainBucketSegment[] segs;
+               synchronized(this) {
+                       segs = segments.toArray(new 
SegmentedChainBucketSegment[segments.size()]);
+                       segments.clear();
+               }
+               for(SegmentedChainBucketSegment segment : segs)
+                       segment.free();
+       }
+
+       public InputStream getInputStream() throws IOException {
+               synchronized(this) {
+                       if(freed) throw new IOException("Freed");
+               }
+               return new InputStream() {
+
+                       int segmentNo = 0;
+                       int bucketNo = 0;
+                       SegmentedChainBucketSegment seg = getSegment(0);
+                       Bucket[] buckets = seg == null ? null : getBuckets(seg);
+                       InputStream is = buckets == null ? null : 
buckets[0].getInputStream();
+                       private long bucketRead = 0;
+                       private boolean closed;
+                       
+                       @Override
+                       public int read() throws IOException {
+                               byte[] b = new byte[1];
+                               if(read(b, 0, 1) <= 0) return -1;
+                               return b[0];
+                       }
+                       
+                       @Override
+                       public int read(byte[] buf) throws IOException {
+                               return read(buf, 0, buf.length);
+                       }
+                       
+                       @Override
+                       public int read(byte[] buf, int offset, int length) 
throws IOException {
+                               if(closed) throw new IOException("Already 
closed");
+                               if(bucketRead == bucketSize || is == null) {
+                                       is.close();
+                                       buckets[bucketNo] = null;
+                                       bucketRead = 0;
+                                       bucketNo++;
+                                       if(bucketNo == segmentSize) {
+                                               bucketNo = 0;
+                                               segmentNo++;
+                                               seg = getSegment(segmentNo);
+                                               if(seg == null) return -1;
+                                       }
+                                       if(bucketNo >= buckets.length) {
+                                               
synchronized(SegmentedBucketChainBucket.this) {
+                                                       if(segmentNo >= 
segments.size())
+                                                               // No more data
+                                                               return -1;
+                                               }
+                                               buckets = getBuckets(seg);
+                                               if(bucketNo >= buckets.length)
+                                                       return -1;
+                                       }
+                                       is = buckets[bucketNo].getInputStream();
+                               }
+                               int r = is.read(buf, offset, length);
+                               if(r > 0)
+                                       bucketRead += r;
+                               return r;
+                       }
+                       
+                       @Override
+                       public void close() throws IOException {
+                               if(closed) return;
+                               is.close();
+                               closed = true;
+                               is = null;
+                               seg = null;
+                               buckets = null;
+                       }
+                       
+               };
+       }
+
+       protected synchronized SegmentedChainBucketSegment getSegment(int i) {
+               return segments.get(i);
+       }
+
+       protected Bucket[] getBuckets(final SegmentedChainBucketSegment seg) {
+               Bucket[] buckets;
+               final BucketArrayWrapper baw = new BucketArrayWrapper();
+               dbJobRunner.runBlocking(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
+                               container.activate(seg, 1);
+                               baw.buckets = seg.shallowCopyBuckets();
+                               container.deactivate(seg, 1);
+                       }
+                       
+               }, NativeThread.HIGH_PRIORITY);
+               return baw.buckets;
+       }
+
+       public String getName() {
+               return "SectoredBucketChainBucket";
+       }
+
+       public OutputStream getOutputStream() throws IOException {
+               SegmentedChainBucketSegment[] segs;
+               synchronized(this) {
+                       if(readOnly) throw new IOException("Read-only");
+                       if(freed) throw new IOException("Freed");
+                       size = 0;
+                       segs = segments.toArray(new 
SegmentedChainBucketSegment[segments.size()]);
+                       segments.clear();
+               }
+               for(int i=0;i<segs.length;i++) {
+                       segs[i].free();
+               }
+               return new OutputStream() {
+                       
+                       int segmentNo = 0;
+                       int bucketNo = 0;
+                       SegmentedChainBucketSegment seg = 
makeSegment(segmentNo, null);
+                       OutputStream cur = seg.makeBucketStream(bucketNo);
+                       private long bucketLength;
+                       private boolean closed;
+
+                       @Override
+                       public void write(int arg0) throws IOException {
+                               write(new byte[] { (byte)arg0 });
+                       }
+                       
+                       @Override
+                       public void write(byte[] buf) throws IOException {
+                               write(buf, 0, buf.length);
+                       }
+                       
+                       @Override
+                       public void write(byte[] buf, int offset, int length) 
throws IOException {
+                               boolean ro;
+                               synchronized(SegmentedBucketChainBucket.this) {
+                                       ro = readOnly;
+                               }
+                               if(ro) {
+                                       if(!closed) close();
+                                       throw new IOException("Read-only");
+                               }
+                               if(closed) throw new IOException("Already 
closed");
+                               while(length > 0) {
+                                       if(bucketLength == bucketSize) {
+                                               bucketNo++;
+                                               cur.close();
+                                               if(bucketNo == segmentSize) {
+                                                       bucketNo = 0;
+                                                       segmentNo++;
+                                                       seg = 
makeSegment(segmentNo, seg);
+                                               }
+                                               cur = 
seg.makeBucketStream(bucketNo);
+                                               bucketLength = 0;
+                                       }
+                                       int left = 
(int)Math.min(Integer.MAX_VALUE, bucketSize - bucketLength);
+                                       int write = Math.min(left, length);
+                                       cur.write(buf, offset, write);
+                                       offset += write;
+                                       length -= write;
+                                       bucketLength += write;
+                                       
synchronized(SegmentedBucketChainBucket.class) {
+                                               size += write;
+                                       }
+                               }                                       
+                       }
+                       
+                       @Override
+                       public void close() throws IOException {
+                               if(closed) return;
+                               cur.close();
+                               closed = true;
+                               cur = null;
+                               final SegmentedChainBucketSegment oldSeg = seg;
+                               seg = null;
+                               dbJobRunner.runBlocking(new DBJob() {
+                                       
+                                       public void run(ObjectContainer 
container, ClientContext context) {
+                                               oldSeg.storeTo(container);
+                                               container.ext().store(segments, 
1);
+                                               
container.ext().store(SegmentedBucketChainBucket.this, 1);
+                                               container.deactivate(oldSeg, 1);
+                                               
dbJobRunner.removeRestartJob(killMe, NativeThread.HIGH_PRIORITY, container);
+                                       }
+                                       
+                               }, NativeThread.HIGH_PRIORITY);
+                       }
+               };
+       }
+
+       private final DBJob killMe = new 
SegmentedBucketChainBucketKillJob(this);
+       
+       protected synchronized SegmentedChainBucketSegment makeSegment(int 
index, final SegmentedChainBucketSegment oldSeg) {
+               if(oldSeg != null) {
+                       dbJobRunner.runBlocking(new DBJob() {
+                               
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       oldSeg.storeTo(container);
+                                       container.ext().store(segments, 1);
+                                       
container.ext().store(SegmentedBucketChainBucket.this, 1);
+                                       container.deactivate(oldSeg, 1);
+                                       dbJobRunner.queueRestartJob(killMe, 
NativeThread.HIGH_PRIORITY, container);
+                               }
+                               
+                       }, NativeThread.NORM_PRIORITY);
+               }
+               SegmentedChainBucketSegment seg = new 
SegmentedChainBucketSegment(this);
+               if(segments.size() != index) throw new 
IllegalArgumentException("Asked to add segment "+index+" but segments length is 
"+segments.size());
+               segments.add(seg);
+               return seg;
+       }
+
+       public boolean isReadOnly() {
+               return readOnly;
+       }
+
+       public void removeFrom(ObjectContainer container) {
+               // FIXME do something
+       }
+
+       public void setReadOnly() {
+               readOnly = true;
+       }
+
+       public synchronized long size() {
+               return size;
+       }
+
+       /**
+        * Note that we don't recurse inside the segments, as it would produce 
a huge
+        * transaction. So you will need to close the OutputStream to commit 
the 
+        * progress of writing to a file. And yes, we can't append. So you need 
to
+        * write everything before storing the bucket.
+        */
+       public void storeTo(ObjectContainer container) {
+               stored = true;
+               dbJobRunner.removeRestartJob(killMe, 
NativeThread.HIGH_PRIORITY, container);
+               container.ext().store(segments, 1);
+               container.ext().store(this, 1);
+       }
+       
+       public Bucket[] getBuckets() {
+               final BucketArrayWrapper baw = new BucketArrayWrapper();
+               dbJobRunner.runBlocking(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
+                               baw.buckets = getBuckets(container);
+                       }
+                       
+               }, NativeThread.HIGH_PRIORITY);
+               return baw.buckets;
+       }
+
+       protected synchronized Bucket[] getBuckets(ObjectContainer container) {
+               int segs = segments.size();
+               if(segs == 0) return new Bucket[0];
+               SegmentedChainBucketSegment seg = segments.get(segs-1);
+               container.activate(seg, 1);
+               seg.activateBuckets(container);
+               int size = (segs - 1) * segmentSize + seg.size();
+               Bucket[] buckets = new Bucket[size];
+               Bucket[] temp = seg.shallowCopyBuckets();
+               container.deactivate(seg, 1);
+               System.arraycopy(temp, 0, buckets, (segs-1)*segmentSize, 
temp.length);
+               temp = null;
+               container.deactivate(seg, 1);
+               int pos = 0;
+               for(int i=0;i<(segs-1);i++) {
+                       seg = segments.get(i);
+                       container.activate(seg, 1);
+                       seg.activateBuckets(container);
+                       temp = seg.shallowCopyBuckets();
+                       container.deactivate(seg, 1);
+                       System.arraycopy(temp, 0, buckets, pos, segmentSize);
+                       pos += segmentSize;
+               }
+               return buckets;
+       }
+
+       public synchronized void clear() {
+               segments.clear();
+       }
+
+}

Added: 
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
                         (rev 0)
+++ 
branches/db4o/freenet/src/freenet/support/io/SegmentedBucketChainBucketKillJob.java
 2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,23 @@
+package freenet.support.io;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
+
+public class SegmentedBucketChainBucketKillJob implements DBJob {
+       
+       final SegmentedBucketChainBucket bcb;
+
+       public SegmentedBucketChainBucketKillJob(SegmentedBucketChainBucket 
bucket) {
+               bcb = bucket;
+       }
+
+       public void run(ObjectContainer container, ClientContext context) {
+               container.activate(bcb, 1);
+               if(bcb.stored) return;
+               System.err.println("Freeing unfinished unstored bucket "+this);
+               bcb.removeFrom(container);
+       }
+
+}

Added: 
branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java   
                            (rev 0)
+++ 
branches/db4o/freenet/src/freenet/support/io/SegmentedChainBucketSegment.java   
    2008-10-22 12:49:39 UTC (rev 23033)
@@ -0,0 +1,60 @@
+package freenet.support.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.support.api.Bucket;
+
+public class SegmentedChainBucketSegment {
+       
+       private final ArrayList<Bucket> buckets;
+       private final SegmentedBucketChainBucket bcb;
+
+       public SegmentedChainBucketSegment(SegmentedBucketChainBucket bucket) {
+               this.bcb = bucket;
+               this.buckets = new ArrayList<Bucket>();
+       }
+
+       public void free() {
+               for(Bucket bucket : buckets)
+                       bucket.free();
+       }
+
+       public void storeTo(ObjectContainer container) {
+               for(Bucket bucket : buckets)
+                       bucket.storeTo(container);
+               container.ext().store(buckets, 1);
+               container.ext().store(this, 1);
+       }
+
+       public synchronized Bucket[] shallowCopyBuckets() {
+               int sz = buckets.size();
+               Bucket[] out = new Bucket[sz];
+               for(int i=0;i<sz;i++) out[i] = buckets.get(i);
+               return out;
+       }
+
+       public OutputStream makeBucketStream(int bucketNo) throws IOException {
+               if(bucketNo >= bcb.segmentSize)
+                       throw new IllegalArgumentException("Too many buckets in 
segment");
+               Bucket b = bcb.bf.makeBucket(bcb.bucketSize);
+               synchronized(this) {
+                       if(buckets.size() != bucketNo)
+                               throw new IllegalArgumentException("Next bucket 
should be "+buckets.size()+" but is "+bucketNo);
+                       buckets.add(b);
+               }
+               return b.getOutputStream();
+       }
+
+       public int size() {
+               return buckets.size();
+       }
+       
+       void activateBuckets(ObjectContainer container) {
+               container.activate(buckets, 1);
+       }
+
+}


Reply via email to