Author: toad
Date: 2008-01-19 00:02:16 +0000 (Sat, 19 Jan 2008)
New Revision: 17161

Added:
   trunk/freenet/src/freenet/support/io/BucketChainBucket.java
Log:
BucketChainBucket: a chain of buckets pretending to be a bucket.
Could be useful for compressing direct to a chain of buckets when doing an 
upload.

Added: trunk/freenet/src/freenet/support/io/BucketChainBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/io/BucketChainBucket.java                 
        (rev 0)
+++ trunk/freenet/src/freenet/support/io/BucketChainBucket.java 2008-01-19 
00:02:16 UTC (rev 17161)
@@ -0,0 +1,262 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.support.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Vector;
+
+import freenet.support.api.Bucket;
+import freenet.support.api.BucketFactory;
+
+public class BucketChainBucket implements Bucket {
+       
+       private final Vector buckets;
+       private long bucketSize;
+       private long size;
+       private boolean freed;
+       private boolean readOnly;
+       private final BucketFactory bf;
+       
+       public BucketChainBucket(long bucketSize, BucketFactory bf) {
+               this.bucketSize = bucketSize;
+               this.buckets = new Vector();
+               this.bf = bf;
+               size = 0;
+               freed = false;
+               readOnly = false;
+       }
+
+       public void free() {
+               Bucket[] list;
+               synchronized(this) {
+                       list = getBuckets();
+                       freed = true;
+                       buckets.clear();
+               }
+               for(int i=0;i<list.length;i++) {
+                       list[i].free();
+               }
+       }
+
+       public synchronized Bucket[] getBuckets() {
+               return (Bucket[]) buckets.toArray(new Bucket[buckets.size()]);
+       }
+
+       public InputStream getInputStream() throws IOException {
+               synchronized(this) {
+                       if(freed) throw new IOException("Freed");
+               }
+               return new InputStream() {
+
+                       private int bucketNo = 0;
+                       private InputStream curBucketStream = 
getBucketInputStream(0);
+                       private long readBytes;
+                       
+                       public int read() throws IOException {
+                               synchronized(BucketChainBucket.this) {
+                                       if(freed) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new IOException("Freed");
+                                       }
+                               }
+                               while(true) {
+                                       if(curBucketStream == null) {
+                                               curBucketStream = 
getBucketInputStream(0);
+                                               if(curBucketStream == null) {
+                                                       return -1;
+                                               }
+                                       }
+                                       try {
+                                               int x = curBucketStream.read();
+                                               if(x >= 0) {
+                                                       readBytes++;
+                                                       return x;
+                                               }
+                                       } catch (EOFException e) {
+                                               // Handle the same
+                                       }
+                                       bucketNo++;
+                                       curBucketStream.close();
+                                       curBucketStream = 
getBucketInputStream(bucketNo++);
+                               }
+                       }
+                       
+                       public int read(byte[] buf) throws IOException {
+                               synchronized(BucketChainBucket.this) {
+                                       if(freed) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new IOException("Freed");
+                                       }
+                               }
+                               return read(buf, 0, buf.length);
+                       }
+                       
+                       public int read(byte[] buf, int offset, int length) 
throws IOException {
+                               synchronized(BucketChainBucket.this) {
+                                       if(freed) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new IOException("Freed");
+                                       }
+                               }
+                               if(length == 0) return 0;
+                               while(true) {
+                                       if(curBucketStream == null) {
+                                               curBucketStream = 
getBucketInputStream(0);
+                                               if(curBucketStream == null) {
+                                                       return -1;
+                                               }
+                                       }
+                                       try {
+                                               int x = 
curBucketStream.read(buf, offset, length);
+                                               if(x > 0) {
+                                                       readBytes += x;
+                                                       return x;
+                                               }
+                                       } catch (EOFException e) {
+                                               // Handle the same
+                                       }
+                                       bucketNo++;
+                                       curBucketStream.close();
+                                       curBucketStream = 
getBucketInputStream(bucketNo++);
+                               }
+                       }
+                       
+                       public int available() throws IOException {
+                               synchronized(BucketChainBucket.this) {
+                                       if(freed) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new IOException("Freed");
+                                       }
+                               }
+                               return (int) Math.min(Integer.MAX_VALUE, size() 
- readBytes);
+                       }
+                       
+                       public void close() throws IOException {
+                               if(curBucketStream != null)
+                                       curBucketStream.close();
+                       }
+                       
+               };
+       }
+
+       protected synchronized InputStream getBucketInputStream(int i) throws 
IOException {
+               Bucket bucket = (Bucket) buckets.get(i);
+               if(bucket == null) return null;
+               return bucket.getInputStream();
+       }
+
+       public String getName() {
+               return "BucketChainBucket";
+       }
+
+       public OutputStream getOutputStream() throws IOException {
+               Bucket[] list;
+               synchronized(this) {
+                       if(readOnly) throw new IOException("Read-only");
+                       if(freed) throw new IOException("Freed");
+                       size = 0;
+                       list = getBuckets();
+               }
+               for(int i=0;i<list.length;i++) {
+                       list[i].free();
+               }
+               return new OutputStream() {
+
+                       private int bucketNo = 0;
+                       private OutputStream curBucketStream = 
makeBucketOutputStream(0);
+                       private long bucketLength = 0;
+                       
+                       public void write(int c) throws IOException {
+                               synchronized(BucketChainBucket.this) {
+                                       if(freed) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new IOException("Freed");
+                                       }
+                                       if(readOnly) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new 
IOException("Read-only");
+                                       }
+                               }
+                               if(bucketLength == bucketSize) {
+                                       curBucketStream.close();
+                                       curBucketStream = 
makeBucketOutputStream(bucketNo++);
+                               }
+                               curBucketStream.write(c);
+                               bucketLength++;
+                               synchronized(BucketChainBucket.this) {
+                                       size++;
+                               }
+                       }
+                       
+                       public void write(byte[] buf) throws IOException {
+                               write(buf, 0, buf.length);
+                       }
+                       
+                       public void write(byte[] buf, int offset, int length) 
throws IOException {
+                               synchronized(BucketChainBucket.this) {
+                                       if(freed) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new IOException("Freed");
+                                       }
+                                       if(readOnly) {
+                                               curBucketStream.close();
+                                               curBucketStream = null;
+                                               throw new 
IOException("Read-only");
+                                       }
+                               }
+                               if(length <= 0) return;
+                               if(bucketLength == bucketSize) {
+                                       curBucketStream.close();
+                                       curBucketStream = 
makeBucketOutputStream(bucketNo++);
+                               }
+                               if(bucketLength + length > bucketSize) {
+                                       int split = (int) (bucketSize - 
bucketLength);
+                                       write(buf, offset, split);
+                                       write(buf, offset + split, length - 
split);
+                                       return;
+                               }
+                               curBucketStream.write(buf, offset, length);
+                               bucketLength += length;
+                               synchronized(BucketChainBucket.this) {
+                                       size += length;
+                               }
+                       }
+                       
+                       public void close() throws IOException {
+                               if(curBucketStream != null)
+                                       curBucketStream.close();
+                       }
+                       
+               };
+       }
+
+       protected OutputStream makeBucketOutputStream(int i) throws IOException 
{
+               Bucket bucket = bf.makeBucket(bucketSize);
+               buckets.set(i, bucket);
+               return bucket.getOutputStream();
+       }
+
+       public boolean isReadOnly() {
+               return readOnly;
+       }
+
+       public void setReadOnly() {
+               readOnly = true;
+       }
+
+       public long size() {
+               return size;
+       }
+
+}


Reply via email to