Author: toad
Date: 2009-02-04 18:37:30 +0000 (Wed, 04 Feb 2009)
New Revision: 25547

Modified:
   branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
Segment-ise inserts. MAJOR UNTESTED CHANGE. This should significantly reduce 
the amount of database work needed for splitfile inserts, especially for 
starting them.


Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java       
2009-02-04 18:36:32 UTC (rev 25546)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java       
2009-02-04 18:37:30 UTC (rev 25547)
@@ -18,6 +18,7 @@
 import freenet.client.Metadata;
 import freenet.keys.CHKBlock;
 import freenet.keys.ClientCHK;
+import freenet.node.PrioRunnable;
 import freenet.support.Executor;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
@@ -75,9 +76,9 @@
                fs.put("SegmentSize", segmentSize);
                fs.put("CheckSegmentSize", checkSegmentSize);
                SimpleFieldSet segs = new SimpleFieldSet(false);
-               for(int i=0;i<segments.length;i++) {
-                       segs.put(Integer.toString(i), 
segments[i].getProgressFieldset());
-               }
+//             for(int i=0;i<segments.length;i++) {
+//                     segs.put(Integer.toString(i), 
segments[i].getProgressFieldset());
+//             }
                segs.put("Count", segments.length);
                fs.put("Segments", segs);
                return fs;
@@ -283,7 +284,7 @@
                return (SplitFileInserterSegment[]) segs.toArray(new 
SplitFileInserterSegment[segs.size()]);
        }
        
-       public void start(ObjectContainer container, ClientContext context) 
throws InsertException {
+       public void start(ObjectContainer container, final ClientContext 
context) throws InsertException {
                for(int i=0;i<segments.length;i++) {
                        if(persistent) {
                                container.activate(segments[i], 1);
@@ -291,7 +292,26 @@
                        segJob.schedule(container, context, 
NativeThread.NORM_PRIORITY-1, persistent);
                                container.deactivate(segments[i], 1);
                        } else {
-                               segments[i].start(container, context);
+                               if(!getCHKOnly)
+                                       segments[i].start(container, context);
+                               else {
+                                       final SplitFileInserterSegment seg = 
segments[i];
+                                       context.mainExecutor.execute(new 
PrioRunnable() {
+
+                                               public int getPriority() {
+                                                       return 
NativeThread.NORM_PRIORITY;
+                                               }
+
+                                               public void run() {
+                                                       try {
+                                                               seg.start(null, 
context);
+                                                       } catch 
(InsertException e) {
+                                                               fail(e, null, 
context);
+                                                       }
+                                               }
+                                               
+                                       }, "Schedule segment (get chk only)");
+                               }
                        }
                }
                if(persistent)

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java    
    2009-02-04 18:36:32 UTC (rev 25546)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java    
    2009-02-04 18:37:30 UTC (rev 25547)
@@ -1,36 +1,50 @@
 package freenet.client.async;
 
+import java.io.IOException;
 import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import com.db4o.ObjectContainer;
 
 import freenet.client.FECCallback;
 import freenet.client.FECCodec;
 import freenet.client.FECJob;
-import freenet.client.FECQueue;
 import freenet.client.FailureCodeTracker;
 import freenet.client.InsertContext;
 import freenet.client.InsertException;
 import freenet.client.Metadata;
 import freenet.client.SplitfileBlock;
-import freenet.keys.BaseClientKey;
 import freenet.keys.CHKBlock;
+import freenet.keys.CHKEncodeException;
 import freenet.keys.ClientCHK;
+import freenet.keys.ClientCHKBlock;
 import freenet.keys.ClientKey;
 import freenet.keys.FreenetURI;
+import freenet.node.KeysFetchingLocally;
+import freenet.node.LowLevelPutException;
+import freenet.node.NodeClientCore;
+import freenet.node.RequestClient;
+import freenet.node.RequestScheduler;
+import freenet.node.SendableInsert;
+import freenet.node.SendableRequestItem;
+import freenet.node.SendableRequestSender;
 import freenet.support.Fields;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
 import freenet.support.api.Bucket;
+import freenet.support.io.BucketTools;
 import freenet.support.io.CannotCreateFromFieldSetException;
-import freenet.support.io.SerializableToFieldSetBucket;
+import freenet.support.io.NativeThread;
 import freenet.support.io.SerializableToFieldSetBucketUtil;
 
-public class SplitFileInserterSegment implements PutCompletionCallback, 
FECCallback {
+public class SplitFileInserterSegment extends SendableInsert implements 
FECCallback, Encodeable {
 
        private static volatile boolean logMINOR;
 
        final SplitFileInserter parent;
+       final BaseClientPutter putter;
 
        final short splitfileAlgo;
 
@@ -42,10 +56,23 @@
 
        final ClientCHK[] checkURIs;
 
-       final SingleBlockInserter[] dataBlockInserters;
-
-       final SingleBlockInserter[] checkBlockInserters;
-
+       final int[] dataRetries;
+       final int[] checkRetries;
+       
+       final int[] dataConsecutiveRNFs;
+       final int[] checkConsecutiveRNFs;
+       
+       /** Block numbers not finished */
+       final ArrayList<Integer> blocks;
+       
+       final boolean[] dataFinished;
+       final boolean[] checkFinished;
+       
+       final boolean[] dataFailed;
+       final boolean[] checkFailed;
+       
+       final int maxRetries;
+       
        final InsertContext blockInsertContext;
 
        final int segNo;
@@ -65,25 +92,17 @@
        private final FailureCodeTracker errors;
 
        private int blocksGotURI;
-
+       private int blocksSucceeded;
        private int blocksCompleted;
        
        private final boolean persistent;
        
-       // A persistent hashCode is helpful in debugging, and also means we can 
put
-       // these objects into sets etc when we need to.
-       
-       private final int hashCode;
-       
-       public int hashCode() {
-               return hashCode;
-       }
 
        public SplitFileInserterSegment(SplitFileInserter parent, boolean 
persistent, BaseClientPutter putter,
                        short splitfileAlgo, int checkBlockCount, Bucket[] 
origDataBlocks,
                        InsertContext blockInsertContext, boolean getCHKOnly, 
int segNo, ObjectContainer container) {
+               super(persistent);
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               hashCode = super.hashCode();
                this.parent = parent;
                this.getCHKOnly = getCHKOnly;
                this.persistent = persistent;
@@ -94,11 +113,22 @@
                checkBlocks = new Bucket[checkBlockCount];
                checkURIs = new ClientCHK[checkBlockCount];
                dataURIs = new ClientCHK[origDataBlocks.length];
-               dataBlockInserters = new SingleBlockInserter[dataBlocks.length];
-               checkBlockInserters = new 
SingleBlockInserter[checkBlocks.length];
+               dataRetries = new int[origDataBlocks.length];
+               checkRetries = new int[checkBlockCount];
+               dataFinished = new boolean[origDataBlocks.length];
+               checkFinished = new boolean[checkBlockCount];
+               dataFailed = new boolean[origDataBlocks.length];
+               checkFailed = new boolean[checkBlockCount];
+               dataConsecutiveRNFs = new int[origDataBlocks.length];
+               checkConsecutiveRNFs = new int[checkBlockCount];
+               blocks = new ArrayList<Integer>();
                putter.addBlocks(dataURIs.length + checkURIs.length, container);
                putter.addMustSucceedBlocks(dataURIs.length + checkURIs.length, 
container);
                this.segNo = segNo;
+               if(persistent) container.activate(blockInsertContext, 1);
+               maxRetries = blockInsertContext.maxInsertRetries;
+               this.putter = putter;
+               
        }
 
        /**
@@ -109,12 +139,13 @@
        public SplitFileInserterSegment(SplitFileInserter parent, boolean 
persistent, BaseClientPutter putter,
                        SimpleFieldSet fs, short splitfileAlgorithm, 
InsertContext ctx,
                        boolean getCHKOnly, int segNo, ClientContext context, 
ObjectContainer container) throws ResumeException {
-               hashCode = super.hashCode();
+               super(persistent);
                this.parent = parent;
                this.splitfileAlgo = splitfileAlgorithm;
                this.getCHKOnly = getCHKOnly;
                this.persistent = persistent;
                this.blockInsertContext = ctx;
+               this.maxRetries = ctx.maxInsertRetries;
                this.segNo = segNo;
                if (!"SplitFileInserterSegment".equals(fs.get("Type")))
                        throw new ResumeException("Wrong Type: " + 
fs.get("Type"));
@@ -148,7 +179,12 @@
 
                dataBlocks = new Bucket[dataBlockCount];
                dataURIs = new ClientCHK[dataBlockCount];
-               dataBlockInserters = new SingleBlockInserter[dataBlockCount];
+               dataRetries = new int[dataBlockCount];
+               dataConsecutiveRNFs = new int[dataBlockCount];
+               dataFinished = new boolean[dataBlockCount];
+               dataFailed = new boolean[dataBlockCount];
+               blocks = new ArrayList<Integer>();
+               this.putter = putter;
 
                // Check blocks first, because if there are missing check 
blocks, we
                // need
@@ -169,7 +205,10 @@
                        }
                        checkBlocks = new Bucket[checkBlockCount];
                        checkURIs = new ClientCHK[checkBlockCount];
-                       checkBlockInserters = new 
SingleBlockInserter[checkBlockCount];
+                       checkRetries = new int[checkBlockCount];
+                       checkConsecutiveRNFs = new int[checkBlockCount];
+                       checkFinished = new boolean[checkBlockCount];
+                       checkFailed = new boolean[checkBlockCount];
                        for (int i = 0; i < checkBlockCount; i++) {
                                String index = Integer.toString(i);
                                SimpleFieldSet blockFS = checkFS.subset(index);
@@ -252,7 +291,10 @@
                        int checkBlocksCount =splitfileAlgo.countCheckBlocks();
                        this.checkURIs = new ClientCHK[checkBlocksCount];
                        this.checkBlocks = new Bucket[checkBlocksCount];
-                       this.checkBlockInserters = new 
SingleBlockInserter[checkBlocksCount];
+                       checkRetries = new int[checkBlocksCount];
+                       checkConsecutiveRNFs = new int[checkBlocksCount];
+                       checkFinished = new boolean[checkBlocksCount];
+                       checkFailed = new boolean[checkBlocksCount];
                        hasURIs = false;
                }
 
@@ -328,87 +370,6 @@
                putter.addMustSucceedBlocks(dataURIs.length + checkURIs.length, 
container);
        }
 
-       public synchronized SimpleFieldSet getProgressFieldset() {
-               SimpleFieldSet fs = new SimpleFieldSet(false); // these get BIG
-               fs.putSingle("Type", "SplitFileInserterSegment");
-               fs.put("Finished", finished);
-               // If true, check blocks which are null are finished
-               fs.put("Encoded", encoded);
-               // If true, data blocks which are null are finished
-               fs.put("Started", started);
-               fs.tput("Errors", errors.toFieldSet(false));
-               SimpleFieldSet dataFS = new SimpleFieldSet(false);
-               dataFS.put("Count", dataBlocks.length);
-               for (int i = 0; i < dataBlocks.length; i++) {
-                       SimpleFieldSet block = new SimpleFieldSet(false);
-                       if (dataURIs[i] != null)
-                               block.putSingle("URI", 
dataURIs[i].getURI().toString());
-                       SingleBlockInserter sbi = dataBlockInserters[i];
-                       // If started, then sbi = null => block finished.
-                       boolean finished = started && sbi == null;
-                       if (started) {
-                               block.put("Finished", finished);
-                       }
-                       Bucket data = dataBlocks[i];
-                       if (data == null && finished) {
-                               // Ignore
-                               if (logMINOR)
-                                       Logger.minor(this, "Could not save to 
disk bucket: null");
-                       } else if (data instanceof 
SerializableToFieldSetBucket) {
-                               SimpleFieldSet tmp = 
((SerializableToFieldSetBucket) data).toFieldSet();
-                               if (tmp == null) {
-                                       if (logMINOR)
-                                               Logger.minor(this, "Could not 
save to disk data: " + data);
-                                       return null;
-                               }
-                               block.put("Data", tmp);
-                       } else {
-                               if (logMINOR)
-                                       Logger.minor(this,
-                                                       "Could not save to disk 
(not serializable to fieldset): " + data);
-                               return null;
-                       }
-                       if (!block.isEmpty())
-                               dataFS.put(Integer.toString(i), block);
-               }
-               fs.put("DataBlocks", dataFS);
-               SimpleFieldSet checkFS = new SimpleFieldSet(false);
-               checkFS.put("Count", checkBlocks.length);
-               for (int i = 0; i < checkBlocks.length; i++) {
-                       SimpleFieldSet block = new SimpleFieldSet(false);
-                       if (checkURIs[i] != null)
-                               block.putSingle("URI", 
checkURIs[i].getURI().toString());
-                       SingleBlockInserter sbi = checkBlockInserters[i];
-                       // If encoded, then sbi == null => block finished
-                       boolean finished = encoded && sbi == null && 
checkURIs[i] != null;
-                       if (encoded) {
-                               block.put("Finished", finished);
-                       }
-                       if (!finished) {
-                               Bucket data = checkBlocks[i];
-                               if (data != null
-                                               && data instanceof 
SerializableToFieldSetBucket) {
-                                       SimpleFieldSet tmp = 
((SerializableToFieldSetBucket) data)
-                                                       .toFieldSet();
-                                       if (tmp == null)
-                                               Logger.error(this, "Could not 
serialize " + data
-                                                               + " - check 
block " + i + " of " + segNo);
-                                       else
-                                               block.put("Data", tmp);
-                               } else if (encoded) {
-                                       Logger.error(this,
-                                                       "Could not save to disk 
(null or not serializable to fieldset) encoded="+encoded+" finished="+finished 
+ " checkURI[i]="+checkURIs[i]+" : "
-                                                                       + data, 
new Exception());
-                                       return null;
-                               }
-                       }
-                       if (!block.isEmpty())
-                               checkFS.put(Integer.toString(i), block);
-               }
-               fs.put("CheckBlocks", checkFS);
-               return fs;
-       }
-
        public void start(ObjectContainer container, ClientContext context) 
throws InsertException {
                // Always called by parent, so don't activate or deactivate 
parent.
                if(persistent) {
@@ -424,16 +385,12 @@
                }
                boolean fin = true;
 
-               for (int i = 0; i < dataBlockInserters.length; i++) {
+               for (int i = 0; i < dataBlocks.length; i++) {
                        if (dataBlocks[i] != null) { // else already finished 
on creation
-                               dataBlockInserters[i] = new 
SingleBlockInserter(parent.parent,
-                                               dataBlocks[i], (short) -1, 
FreenetURI.EMPTY_CHK_URI,
-                                               blockInsertContext, this, 
false, CHKBlock.DATA_LENGTH,
-                                               i, getCHKOnly, false, false, 
parent.token, container, context, persistent, false);
-                               dataBlockInserters[i].schedule(container, 
context);
-                               if(persistent)
-                                       
container.deactivate(dataBlockInserters[i], 1);
                                fin = false;
+                               synchronized(this) {
+                                       blocks.add(i);
+                               }
                        } else {
                                parent.parent.completedBlock(true, container, 
context);
                        }
@@ -464,16 +421,11 @@
                                }                               
                                fin = false;
                } else {
-                       for (int i = 0; i < checkBlockInserters.length; i++) {
+                       for (int i = 0; i < checkBlocks.length; i++) {
                                if (checkBlocks[i] != null) {
-                                       checkBlockInserters[i] = new 
SingleBlockInserter(
-                                                       parent.parent, 
checkBlocks[i], (short) -1,
-                                                       
FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
-                                                       false, 
CHKBlock.DATA_LENGTH, i + dataBlocks.length,
-                                                       getCHKOnly, false, 
false, parent.token, container, context, persistent, false);
-                                       
checkBlockInserters[i].schedule(container, context);
-                                       if(persistent)
-                                               
container.deactivate(checkBlockInserters[i], 1);
+                                       synchronized(this) {
+                                               blocks.add(i + 
dataBlocks.length);
+                                       }
                                        fin = false;
                                } else
                                        parent.parent.completedBlock(true, 
container, context);
@@ -493,6 +445,8 @@
                        parent.segmentFetchable(this, container);
                if (fin)
                        finish(container, context, parent);
+               else
+                       schedule(container, context);
                if (finished) {
                        parent.segmentFinished(this, container, context);
                }
@@ -501,6 +455,47 @@
                }
        }
 
+       private void schedule(ObjectContainer container, ClientContext context) 
{
+               if(!getCHKOnly) {
+                       this.getScheduler(context).registerInsert(this, 
persistent, false, container);
+               } else {
+                       tryEncode(container, context);
+               }
+       }
+
+       public void tryEncode(ObjectContainer container, ClientContext context) 
{
+               for(int i=0;i<dataBlocks.length;i++) {
+                       if(dataURIs[i] == null && dataBlocks[i] != null) {
+                               try {
+                                       ClientCHK key = (ClientCHK) 
encodeBucket(dataBlocks[i]).getClientKey();
+                                       onEncode(i, key, container, context);
+                               } catch (CHKEncodeException e) {
+                                       fail(new 
InsertException(InsertException.INTERNAL_ERROR, e, null), container, context);  
                                       
+                               } catch (IOException e) {
+                                       fail(new 
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+                               }
+                       } else if(dataURIs[i] == null && dataBlocks[i] == null) 
{
+                               fail(new 
InsertException(InsertException.INTERNAL_ERROR, "Data block "+i+" cannot be 
encoded: no data", null), container, context);                                  
   
+                       }
+               }
+               if(encoded) {
+                       for(int i=0;i<checkBlocks.length;i++) {
+                               if(checkURIs[i] == null && checkBlocks[i] != 
null) {
+                                       try {
+                                               ClientCHK key = (ClientCHK) 
encodeBucket(checkBlocks[i]).getClientKey();
+                                               onEncode(i, key, container, 
context);
+                                       } catch (CHKEncodeException e) {
+                                               fail(new 
InsertException(InsertException.INTERNAL_ERROR, e, null), container, context);  
                                       
+                                       } catch (IOException e) {
+                                               fail(new 
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+                                       }
+                               } else if(checkURIs[i] == null && 
checkBlocks[i] == null) {
+                                       fail(new 
InsertException(InsertException.INTERNAL_ERROR, "Data block "+i+" cannot be 
encoded: no data", null), container, context);                                  
   
+                               }
+                       }
+               }
+       }
+
        public void onDecodedSegment(ObjectContainer container, ClientContext 
context, FECJob job, Bucket[] dataBuckets, Bucket[] checkBuckets, 
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {} // 
irrevelant
 
        public void onEncodedSegment(ObjectContainer container, ClientContext 
context, FECJob job, Bucket[] dataBuckets, Bucket[] checkBuckets, 
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {
@@ -534,8 +529,8 @@
                // Start the inserts
                try {
                        if(logMINOR)
-                               Logger.minor(this, "Scheduling 
"+checkBlockInserters.length+" check blocks...");
-                       for (int i = 0; i < checkBlockInserters.length; i++) {
+                               Logger.minor(this, "Scheduling 
"+checkBlocks.length+" check blocks...");
+                       for (int i = 0; i < checkBlocks.length; i++) {
                                // See comments on FECCallback: WE MUST COPY 
THE DATA BACK!!!
                                checkBlocks[i] = checkBuckets[i];
                                if(checkBlocks[i] == null) {
@@ -545,18 +540,15 @@
                                }
                                if(persistent)
                                        checkBlocks[i].storeTo(container);
-                               if(checkBlockInserters[i] != null) continue;
-                               checkBlockInserters[i] = new 
SingleBlockInserter(parent.parent,
-                                               checkBlocks[i], (short) -1, 
FreenetURI.EMPTY_CHK_URI,
-                                               blockInsertContext, this, 
false, CHKBlock.DATA_LENGTH,
-                                               i + dataBlocks.length, 
getCHKOnly, false, false,
-                                               parent.token, container, 
context, persistent, false);
-                               checkBlockInserters[i].schedule(container, 
context);
                                if(persistent) {
-                                       
container.deactivate(checkBlockInserters[i], 1);
                                        container.deactivate(checkBlocks[i], 1);
                                }
                        }
+                       synchronized(this) {
+                               for(int i=0;i<checkBlocks.length;i++)
+                                       blocks.add(dataBlocks.length + i);
+                       }
+                       schedule(container, context);
                } catch (Throwable t) {
                        Logger.error(this, "Caught " + t + " while encoding " + 
this, t);
                        InsertException ex = new InsertException(
@@ -579,8 +571,8 @@
                parent.encodedSegment(this, container, context);
 
                synchronized (this) {
-                       for (int i = 0; i < dataBlockInserters.length; i++) {
-                               if (dataBlockInserters[i] == null && 
dataBlocks[i] != null) {
+                       for (int i = 0; i < dataBlocks.length; i++) {
+                               if (dataFinished[i] && dataBlocks[i] != null) {
                                        if(persistent) 
container.activate(dataBlocks[i], 1);
                                        dataBlocks[i].free();
                                        if(persistent)
@@ -640,11 +632,8 @@
                parent.segmentFinished(this, container, context);
        }
 
-       public void onEncode(BaseClientKey k, ClientPutState state, 
ObjectContainer container, ClientContext context) {
-               ClientCHK key = (ClientCHK) k;
-               SingleBlockInserter sbi = (SingleBlockInserter) state;
-               int x = sbi.token;
-               if(logMINOR) Logger.minor(this, "Encoded block "+x+" on 
"+this+" : "+sbi);
+       private void onEncode(int x, ClientCHK key, ObjectContainer container, 
ClientContext context) {
+               if(logMINOR) Logger.minor(this, "Encoded block "+x+" on "+this);
                synchronized (this) {
                        if (finished)
                                return;
@@ -690,113 +679,6 @@
                        container.deactivate(parent, 1);
        }
 
-       public void onSuccess(ClientPutState state, ObjectContainer container, 
ClientContext context) {
-               if(persistent) {
-                       container.activate(parent, 1);
-                       container.activate(parent.parent, 1);
-               }
-               if (parent.parent.isCancelled()) {
-                       parent.cancel(container, context);
-                       return;
-               }
-               SingleBlockInserter sbi = (SingleBlockInserter) state;
-               int x = sbi.token;
-               completed(x, container, context);
-               if(persistent) {
-                       container.deactivate(parent.parent, 1);
-                       container.deactivate(parent, 1);
-               }
-       }
-
-       public void onFailure(InsertException e, ClientPutState state, 
ObjectContainer container, ClientContext context) {
-               if(persistent) {
-                       container.activate(parent, 1);
-                       container.activate(parent.parent, 1);
-                       container.activate(errors, 1);
-               }
-               if (parent.parent.isCancelled()) {
-                       parent.cancel(container, context);
-                       return;
-               }
-               SingleBlockInserter sbi = (SingleBlockInserter) state;
-               int x = sbi.token;
-               errors.merge(e);
-               completed(x, container, context);
-               if(persistent) {
-                       container.deactivate(parent.parent, 1);
-                       container.deactivate(parent, 1);
-                       container.deactivate(errors, 1);
-               }
-       }
-
-       private void completed(int x, ObjectContainer container, ClientContext 
context) {
-               int total = innerCompleted(x, container);
-               if (total == -1)
-                       return;
-               if (total == dataBlockInserters.length) {
-                       if(persistent)
-                               container.activate(parent, 1);
-                       parent.segmentFetchable(this, container);
-               }
-               if (total != dataBlockInserters.length + 
checkBlockInserters.length)
-                       return;
-               if(persistent)
-                       container.store(this);
-               finish(container, context, parent);
-       }
-
-       /**
-        * Called when a block has completed.
-        * 
-        * @param x
-        *            The block number.
-        * @return -1 if the segment has already finished, otherwise the number 
of
-        *         completed blocks.
-        */
-       private synchronized int innerCompleted(int x, ObjectContainer 
container) {
-               if (logMINOR)
-                       Logger.minor(this, "Completed: " + x + " on " + this
-                                       + " ( completed=" + blocksCompleted + 
", total="
-                                       + (dataBlockInserters.length + 
checkBlockInserters.length));
-
-               if (finished)
-                       return -1;
-               if (x >= dataBlocks.length) {
-                       x -= dataBlocks.length;
-                       if (checkBlockInserters[x] == null) {
-                               Logger.error(this, "Completed twice: check 
block " + x + " on "
-                                               + this, new Exception());
-                               return blocksCompleted;
-                       }
-                       checkBlockInserters[x] = null;
-                       if(persistent)
-                               container.activate(checkBlocks[x], 1);
-                       checkBlocks[x].free();
-                       if(persistent)
-                               checkBlocks[x].removeFrom(container);
-                       checkBlocks[x] = null;
-               } else {
-                       if (dataBlockInserters[x] == null) {
-                               Logger.error(this, "Completed twice: data block 
" + x + " on "
-                                               + this, new Exception());
-                               return blocksCompleted;
-                       }
-                       dataBlockInserters[x] = null;
-                       if (encoded) {
-                               if(persistent)
-                                       container.activate(dataBlocks[x], 1);
-                               dataBlocks[x].free();
-                               if(persistent)
-                                       dataBlocks[x].removeFrom(container);
-                               dataBlocks[x] = null;
-                       }
-               }
-               blocksCompleted++;
-               if(persistent)
-                       container.store(this);
-               return blocksCompleted;
-       }
-
        public synchronized boolean isFinished() {
                return finished;
        }
@@ -839,37 +721,7 @@
        }
 
        private void cancelInner(ObjectContainer container, ClientContext 
context) {
-               for (int i = 0; i < dataBlockInserters.length; i++) {
-                       SingleBlockInserter sbi = dataBlockInserters[i];
-                       if(persistent)
-                               container.activate(sbi, 1);
-                       if (sbi != null)
-                               sbi.cancel(container, context);
-                       Bucket d = dataBlocks[i];
-                       if (d != null) {
-                               if(persistent)
-                                       container.activate(d, 5);
-                               d.free();
-                               if(persistent)
-                                       d.removeFrom(container);
-                               dataBlocks[i] = null;
-                       }
-               }
-               for (int i = 0; i < checkBlockInserters.length; i++) {
-                       SingleBlockInserter sbi = checkBlockInserters[i];
-                       if(persistent)
-                               container.activate(sbi, 1);
-                       if (sbi != null)
-                               sbi.cancel(container, context);
-                       Bucket d = checkBlocks[i];
-                       if (d != null) {
-                               if(persistent)
-                                       container.activate(d, 5);
-                               d.free();
-                               d.removeFrom(container);
-                               checkBlocks[i] = null;
-                       }
-               }
+               super.unregister(container, context);
                if(persistent) {
                        container.store(this);
                        container.activate(parent, 1);
@@ -909,10 +761,22 @@
         * encoded ASAP.
         */
        public void forceEncode(ObjectContainer container, ClientContext 
context) {
-               context.backgroundBlockEncoder.queue(dataBlockInserters, 
container, context);
-               context.backgroundBlockEncoder.queue(checkBlockInserters, 
container, context);
+               context.backgroundBlockEncoder.queue(this, container, context);
        }
 
+       public void fail(InsertException e, ObjectContainer container, 
ClientContext context) {
+               synchronized(this) {
+                       if(finished) {
+                               Logger.error(this, "Failing but already 
finished on "+this);
+                               return;
+                       }
+                       finished = true;
+                       Logger.error(this, "Insert segment failed: "+e+" for 
"+this, e);
+                       this.toThrow = e;
+               }
+               cancelInner(container, context);
+       }
+       
        public void onFailed(Throwable t, ObjectContainer container, 
ClientContext context) {
                synchronized(this) {
                        if(finished) {
@@ -925,4 +789,563 @@
                }
                cancelInner(container, context);
        }
+
+       Bucket getBucket(int blockNum) {
+               if(blockNum > dataBlocks.length)
+                       return checkBlocks[blockNum - dataBlocks.length];
+               else
+                       return dataBlocks[blockNum];
+       }
+       
+       private BlockItem getBlockItem(ObjectContainer container, ClientContext 
context, int blockNum) throws IOException {
+               Bucket sourceData = getBucket(blockNum);
+               boolean deactivateBucket = false;
+               if(persistent) {
+                       deactivateBucket = 
!container.ext().isActive(sourceData);
+                       if(deactivateBucket)
+                               container.activate(sourceData, 1);
+               }
+               Bucket data = sourceData.createShadow();
+               if(data == null) {
+                       data = 
context.tempBucketFactory.makeBucket(sourceData.size());
+                       BucketTools.copy(sourceData, data);
+               }
+               if(persistent) {
+                       if(deactivateBucket)
+                               container.deactivate(sourceData, 1);
+               }
+               return new BlockItem(this, blockNum, data, persistent);
+       }
+       
+       private int hashCodeForBlock(int blockNum) {
+               // FIXME: Standard hashCode() pattern assumes both inputs are 
evenly
+               // distributed ... this is not true here.
+               return hashCode() * (blockNum + 1);
+       }
+       
+       private static class BlockItem implements SendableRequestItem {
+               
+               private final boolean persistent;
+               private final Bucket copyBucket;
+               private final int hashCode;
+               /** STRICTLY for purposes of equals() !!! */
+               private final SplitFileInserterSegment parent;
+               private final int blockNum;
+               
+               BlockItem(SplitFileInserterSegment parent, int blockNum, Bucket 
bucket, boolean persistent) throws IOException {
+                       this.parent = parent;
+                       this.blockNum = blockNum;
+                       this.copyBucket = bucket;
+                       this.hashCode = parent.hashCodeForBlock(blockNum);
+                       this.persistent = persistent;
+               }
+               
+               public void dump() {
+                       copyBucket.free();
+               }
+               
+               public int hashCode() {
+                       return hashCode;
+               }
+               
+               public boolean equals(Object o) {
+                       if(o instanceof BlockItem) {
+                               if(((BlockItem)o).parent == parent && 
((BlockItem)o).blockNum == blockNum) return true;
+                       } else if(o instanceof FakeBlockItem) {
+                               if(((FakeBlockItem)o).getParent() == parent && 
((FakeBlockItem)o).blockNum == blockNum) return true;
+                       }
+                       return false;
+               }
+               
+       }
+       
+       // Used for testing whether a block is already queued.
+       private class FakeBlockItem implements SendableRequestItem {
+               
+               private final int blockNum;
+               private final int hashCode;
+               
+               FakeBlockItem(int blockNum) {
+                       this.blockNum = blockNum;
+                       this.hashCode = hashCodeForBlock(blockNum);
+                       
+               }
+               
+               public void dump() {
+                       // Do nothing
+               }
+               
+               public SplitFileInserterSegment getParent() {
+                       return SplitFileInserterSegment.this;
+               }
+
+               @Override
+               public int hashCode() {
+                       return hashCode;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if(o instanceof BlockItem) {
+                               if(((BlockItem)o).parent == 
SplitFileInserterSegment.this && ((BlockItem)o).blockNum == blockNum) return 
true;
+                       } else if(o instanceof FakeBlockItem) {
+                               if(((FakeBlockItem)o).getParent() == 
SplitFileInserterSegment.this && ((FakeBlockItem)o).blockNum == blockNum) 
return true;
+                       }
+                       return false;
+               }
+       }
+       
+       @Override
+       public void onFailure(LowLevelPutException e, Object keyNum, 
ObjectContainer container, ClientContext context) {
+               BlockItem block = (BlockItem) keyNum;
+               // First report the error.
+               if(persistent)
+                       container.activate(errors, 5);
+               switch(e.code) {
+               case LowLevelPutException.COLLISION:
+                       Logger.error(this, "Collision on a CHK?!?!?");
+                       fail(new 
InsertException(InsertException.INTERNAL_ERROR, "Collision on a CHK", null), 
container, context);
+                       return;
+               case LowLevelPutException.INTERNAL_ERROR:
+                       errors.inc(InsertException.INTERNAL_ERROR);
+                       break;
+               case LowLevelPutException.REJECTED_OVERLOAD:
+                       errors.inc(InsertException.REJECTED_OVERLOAD);
+                       break;
+               case LowLevelPutException.ROUTE_NOT_FOUND:
+                       errors.inc(InsertException.ROUTE_NOT_FOUND);
+                       break;
+               case LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
+                       errors.inc(InsertException.ROUTE_REALLY_NOT_FOUND);
+                       break;
+               default:
+                       Logger.error(this, "Unknown LowLevelPutException code: 
"+e.code);
+                       errors.inc(InsertException.INTERNAL_ERROR);
+               }
+               if(persistent)
+                       container.store(errors);
+               boolean isRNF = e.code == LowLevelPutException.ROUTE_NOT_FOUND 
||
+                       e.code == LowLevelPutException.ROUTE_REALLY_NOT_FOUND;
+               int blockNum = block.blockNum;
+               boolean treatAsSuccess = false;
+               boolean failedBlock = false;
+               int completed;
+               int succeeded;
+               synchronized(this) {
+                       if(blockNum > dataBlocks.length) {
+                               // Check block.
+                               int checkNum = blockNum = dataBlocks.length;
+                               if(checkFinished[checkNum]) {
+                                       if(checkFailed[checkNum])
+                                               Logger.error(this, "Got 
onFailure() but block has already failed! Check block "+checkNum+" on "+this);
+                                       else
+                                               Logger.error(this, "Got 
onFailure() but block has already succeeded: Check block "+checkNum+" on 
"+this);
+                                       return;
+                               }
+                               if(isRNF) {
+                                       checkConsecutiveRNFs[checkNum]++;
+                                       if(persistent) 
container.activate(blockInsertContext, 1);
+                                       if(logMINOR) Logger.minor(this, 
"Consecutive RNFs: "+checkConsecutiveRNFs[checkNum]+" / 
"+blockInsertContext.consecutiveRNFsCountAsSuccess);
+                                       if(checkConsecutiveRNFs[checkNum] == 
blockInsertContext.consecutiveRNFsCountAsSuccess) {
+                                               // Treat as success
+                                               treatAsSuccess = true;
+                                       }
+                               } else {
+                                       checkConsecutiveRNFs[checkNum] = 0;
+                               }
+                               if(!treatAsSuccess) {
+                                       checkRetries[checkNum]++;
+                                       if(checkRetries[checkNum] > maxRetries 
&& maxRetries != -1) {
+                                               failedBlock = true;
+                                               // Treat as failed.
+                                               checkFinished[checkNum] = true;
+                                               checkFailed[checkNum] = true;
+                                               blocksCompleted++;
+                                               if(checkBlocks[checkNum] != 
null) {
+                                                       if(persistent) 
container.activate(checkBlocks[checkNum], 1);
+                                                       
checkBlocks[checkNum].free();
+                                                       
checkBlocks[checkNum].removeFrom(container);
+                                                       checkBlocks[checkNum] = 
null;
+                                               } else {
+                                                       Logger.error(this, 
"Check block "+checkNum+" failed on "+this+" but bucket is already nulled 
out!");
+                                               }
+                                       }
+                                       // Else we are still registered, but 
will have to be
+                                       // re-selected: for persistent 
requests, the current
+                                       // PersistentChosenRequest will not 
re-run the same block.
+                                       // This is okay!
+                               } else {
+                                       // Better handle it here to minimize 
race conditions. :|
+                                       checkFinished[checkNum] = true;
+                                       checkFailed[checkNum] = false; // 
Treating as succeeded
+                                       blocksCompleted++;
+                                       blocksSucceeded++;
+                                       if(checkBlocks[checkNum] != null) {
+                                               if(persistent) 
container.activate(checkBlocks[checkNum], 1);
+                                               checkBlocks[checkNum].free();
+                                               
checkBlocks[checkNum].removeFrom(container);
+                                               checkBlocks[checkNum] = null;
+                                       } else {
+                                               Logger.error(this, "Check block 
"+checkNum+" succeeded (sort of) on "+this+" but bucket is already nulled 
out!");
+                                       }
+                               }
+                       } else {
+                               // Data block.
+                               if(dataFinished[blockNum]) {
+                                       if(dataFailed[blockNum])
+                                               Logger.error(this, "Got 
onFailure() but block has already failed! Data block "+blockNum+" on "+this);
+                                       else
+                                               Logger.error(this, "Got 
onFailure() but block has already succeeded: Data block "+blockNum+" on "+this);
+                                       return;
+                               }
+                               if(isRNF) {
+                                       dataConsecutiveRNFs[blockNum]++;
+                                       if(persistent) 
container.activate(blockInsertContext, 1);
+                                       if(logMINOR) Logger.minor(this, 
"Consecutive RNFs: "+dataConsecutiveRNFs[blockNum]+" / 
"+blockInsertContext.consecutiveRNFsCountAsSuccess);
+                                       if(dataConsecutiveRNFs[blockNum] == 
blockInsertContext.consecutiveRNFsCountAsSuccess) {
+                                               // Treat as success
+                                               treatAsSuccess = true;
+                                       }
+                               } else {
+                                       dataConsecutiveRNFs[blockNum] = 0;
+                               }
+                               if(!treatAsSuccess) {
+                                       dataRetries[blockNum]++;
+                                       if(dataRetries[blockNum] > maxRetries 
&& maxRetries != -1) {
+                                               failedBlock = true;
+                                               // Treat as failed.
+                                               dataFinished[blockNum] = true;
+                                               dataFailed[blockNum] = true;
+                                               blocksCompleted++;
+                                               if(dataBlocks[blockNum] != 
null) {
+                                                       if(persistent) 
container.activate(dataBlocks[blockNum], 1);
+                                                       
dataBlocks[blockNum].free();
+                                                       
dataBlocks[blockNum].removeFrom(container);
+                                                       dataBlocks[blockNum] = 
null;
+                                               } else {
+                                                       Logger.error(this, 
"Data block "+blockNum+" failed on "+this+" but bucket is already nulled out!");
+                                               }
+                                       }
+                                       // Else we are still registered, but 
will have to be
+                                       // re-selected: for persistent 
requests, the current
+                                       // PersistentChosenRequest will not 
re-run the same block.
+                                       // This is okay!
+                               } else {
+                                       // Better handle it here to minimize 
race conditions. :|
+                                       dataFinished[blockNum] = true;
+                                       dataFailed[blockNum] = false; // 
Treating as succeeded
+                                       blocksCompleted++;
+                                       blocksSucceeded++;
+                                       if(dataBlocks[blockNum] != null && 
encoded) {
+                                               if(persistent) 
container.activate(dataBlocks[blockNum], 1);
+                                               dataBlocks[blockNum].free();
+                                               
dataBlocks[blockNum].removeFrom(container);
+                                               dataBlocks[blockNum] = null;
+                                       } else {
+                                               Logger.error(this, "Data block 
"+blockNum+" succeeded (sort of) on "+this+" but bucket is already nulled 
out!");
+                                       }
+                               }
+                       }
+                       if(persistent)
+                               container.store(this);
+                       completed = blocksCompleted;
+                       succeeded = blocksSucceeded;
+               }
+               if(persistent) {
+                       container.activate(putter, 1);
+                       if(failedBlock)
+                               putter.failedBlock(container, context);
+                       else if(treatAsSuccess)
+                               putter.completedBlock(false, container, 
context);
+                       container.deactivate(putter, 1);
+               }
+               if(succeeded == dataBlocks.length) {
+                       container.activate(parent, 1);
+                       parent.segmentFetchable(this, container);
+                       container.deactivate(parent, 1);
+               } else if(completed == dataBlocks.length + checkBlocks.length) {
+                       container.activate(parent, 1);
+                       finish(container, context, parent);
+                       container.deactivate(parent, 1);
+               }
+       }
+
+       @Override
+       public void onSuccess(Object keyNum, ObjectContainer container, 
ClientContext context) {
+               BlockItem block = (BlockItem) keyNum;
+               int blockNum = block.blockNum;
+               int completed;
+               int succeeded;
+               synchronized(this) {
+                       if(blockNum > dataBlocks.length) {
+                               // Check block.
+                               int checkNum = blockNum = dataBlocks.length;
+                               if(!checkFinished[checkNum]) {
+                                       checkFinished[checkNum] = true;
+                                       checkFailed[checkNum] = false;
+                                       blocksCompleted++;
+                                       blocksSucceeded++;
+                               } else {
+                                       if(checkFailed[checkNum])
+                                               Logger.error(this, "Got 
onSuccess() but block has already failed! Check block "+checkNum+" on "+this);
+                                       else
+                                               Logger.error(this, "Got 
onSuccess() but block has already succeeded: Check block "+checkNum+" on 
"+this);
+                                       return;
+                               }
+                               if(checkBlocks[checkNum] != null) {
+                                       if(persistent) 
container.activate(checkBlocks[checkNum], 1);
+                                       checkBlocks[checkNum].free();
+                                       
checkBlocks[checkNum].removeFrom(container);
+                                       checkBlocks[checkNum] = null;
+                               } else {
+                                       Logger.error(this, "Check block 
"+checkNum+" succeeded on "+this+" but bucket is already nulled out!");
+                               }
+                       } else {
+                               // Data block
+                               if(!dataFinished[blockNum]) {
+                                       dataFinished[blockNum] = true;
+                                       dataFailed[blockNum] = false;
+                                       blocksCompleted++;
+                                       blocksSucceeded++;
+                               } else {
+                                       if(dataFailed[blockNum])
+                                               Logger.error(this, "Got 
onSuccess() but block has already failed! Data block "+blockNum+" on "+this);
+                                       else
+                                               Logger.error(this, "Got 
onSuccess() but block has already succeeded: Data block "+blockNum+" on "+this);
+                                       return;
+                               }
+                               if(encoded && dataBlocks[blockNum] != null) {
+                                       if(persistent) 
container.activate(dataBlocks[blockNum], 1);
+                                       dataBlocks[blockNum].free();
+                                       
dataBlocks[blockNum].removeFrom(container);
+                                       dataBlocks[blockNum] = null;
+                               } else if(dataBlocks[blockNum] == null) {
+                                       Logger.error(this, "Data block 
"+blockNum+" succeeded on "+this+" but bucket is already nulled out!");
+                               }
+                       }
+                       if(persistent)
+                               container.store(this);
+                       completed = blocksCompleted;
+                       succeeded = blocksSucceeded;
+               }
+               if(persistent) {
+                       container.activate(putter, 1);
+                       putter.completedBlock(false, container, context);
+                       container.deactivate(putter, 1);
+               }
+               if(succeeded == dataBlocks.length) {
+                       container.activate(parent, 1);
+                       parent.segmentFetchable(this, container);
+                       container.deactivate(parent, 1);
+               } else if(completed == dataBlocks.length + checkBlocks.length) {
+                       container.activate(parent, 1);
+                       finish(container, context, parent);
+                       container.deactivate(parent, 1);
+               }
+       }
+
+       @Override
+       public SendableRequestItem[] allKeys(ObjectContainer container, 
ClientContext context) {
+               return sendableKeys(container, context);
+       }
+
+       @Override
+       public SendableRequestItem chooseKey(KeysFetchingLocally keys, 
ObjectContainer container, ClientContext context) {
+               if(persistent) {
+                       container.activate(this, 1);
+                       container.activate(blocks, 1);
+               }
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               synchronized(this) {
+                       if(finished) return null;
+                       if(blocks.isEmpty()) {
+                               if(logMINOR)
+                                       Logger.minor(this, "No blocks to 
remove");
+                               return null;
+                       }
+                       for(int i=0;i<10;i++) {
+                               Integer ret;
+                               int x;
+                               if(blocks.size() == 0) return null;
+                               x = context.random.nextInt(blocks.size());
+                               ret = blocks.get(x);
+                               int num = ret;
+                               
+                               // Check whether it is already running
+                               if(!persistent) {
+                                       if(keys.hasTransientInsert(this, new 
FakeBlockItem(num)))
+                                               return null;
+                               }
+
+                               try {
+                                       return getBlockItem(container, context, 
num);
+                               } catch (IOException e) {
+                                       fail(new 
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+                                       return null;
+                               }
+                       }
+                       return null;
+               }
+       }
+
+       @Override
+       public RequestClient getClient(ObjectContainer container) {
+               if(persistent) container.activate(putter, 1);
+               return putter.getClient();
+       }
+
+       @Override
+       public ClientRequester getClientRequest() {
+               return putter;
+       }
+
+       @Override
+       public short getPriorityClass(ObjectContainer container) {
+               container.activate(parent, 1);
+               return putter.getPriorityClass();
+       }
+
+       @Override
+       public int getRetryCount() {
+               // No point scheduling inserts by retry count.
+               // FIXME: Either implement sub-segments to schedule by retry 
count,
+               // or (more likely imho) make the scheduler not care about 
retry counts for inserts.
+               return 0;
+       }
+
+       @Override
+       public SendableRequestSender getSender(ObjectContainer container, 
ClientContext context) {
+               return new SendableRequestSender() {
+
+                       public boolean send(NodeClientCore core, 
RequestScheduler sched, final ClientContext context, ChosenBlock req) {
+                               // Ignore keyNum, key, since we're only sending 
one block.
+                               try {
+                                       BlockItem block = (BlockItem) req.token;
+                                       if(logMINOR) Logger.minor(this, 
"Starting request: "+SplitFileInserterSegment.this+" block number 
"+block.blockNum);
+                                       ClientCHKBlock b;
+                                       try {
+                                               b = 
encodeBucket(block.copyBucket);
+                                       } catch (CHKEncodeException e) {
+                                               throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, e.toString() + ":" + 
e.getMessage(), e);
+                                       } catch (MalformedURLException e) {
+                                               throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, e.toString() + ":" + 
e.getMessage(), e);
+                                       } catch (IOException e) {
+                                               throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, e.toString() + ":" + 
e.getMessage(), e);
+                                       } finally {
+                                               block.copyBucket.free();
+                                       }
+                                       final ClientCHK key = (ClientCHK) 
b.getClientKey();
+                                       final int num = block.blockNum;
+                                       if(block.persistent) {
+                                       context.jobRunner.queue(new DBJob() {
+
+                                               public void run(ObjectContainer 
container, ClientContext context) {
+                                                       
container.activate(SplitFileInserterSegment.this, 1);
+                                                       onEncode(num, key, 
container, context);
+                                                       
container.deactivate(SplitFileInserterSegment.this, 1);
+                                               }
+
+                                       }, NativeThread.NORM_PRIORITY+1, false);
+                                       } else {
+                                               
context.mainExecutor.execute(new Runnable() {
+
+                                                       public void run() {
+                                                               onEncode(num, 
key, null, context);
+                                                       }
+                                                       
+                                               }, "Got URI");
+                                               
+                                       }
+                                       if(b != null)
+                                               core.realPut(b, 
req.cacheLocalRequests);
+                                       else {
+                                               Logger.error(this, "Asked to 
send empty block on "+SplitFileInserterSegment.this, new Exception("error"));
+                                               return false;
+                                       }
+                               } catch (LowLevelPutException e) {
+                                       req.onFailure(e, context);
+                                       if(logMINOR) Logger.minor(this, 
"Request failed: "+SplitFileInserterSegment.this+" for "+e);
+                                       return true;
+                               }
+                               if(logMINOR) Logger.minor(this, "Request 
succeeded: "+SplitFileInserterSegment.this);
+                               req.onInsertSuccess(context);
+                               return true;
+                       }
+                       
+               };
+       }
+
+       protected ClientCHKBlock encodeBucket(Bucket copyBucket) throws 
CHKEncodeException, IOException {
+               return ClientCHKBlock.encode(copyBucket, false, true, 
(short)-1, CHKBlock.DATA_LENGTH);
+       }
+
+       @Override
+       public boolean isCancelled(ObjectContainer container) {
+               return finished;
+       }
+
+       @Override
+       public boolean isSSK() {
+               return false;
+       }
+
+       @Override
+       public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest 
request, RequestScheduler sched, ObjectContainer container, ClientContext 
context) {
+               if(persistent) {
+                       container.activate(blocks, 1);
+               }
+               Integer[] blockNumbers;
+               synchronized(this) {
+                       blockNumbers = blocks.toArray(new 
Integer[blocks.size()]);
+               }
+               ArrayList<PersistentChosenBlock> blocks = new 
ArrayList<PersistentChosenBlock>();
+               Arrays.sort(blockNumbers);
+               int prevBlockNumber = -1;
+               for(int i=0;i<blockNumbers.length;i++) {
+                       int blockNumber = blockNumbers[i];
+                       if(blockNumber == prevBlockNumber) {
+                               Logger.error(this, "Duplicate block number in 
makeBlocks() in "+this+": two copies of "+blockNumber);
+                               continue;
+                       }
+                       prevBlockNumber = blockNumber;
+                       SendableRequestItem item;
+                       try {
+                               item = getBlockItem(container, context, 
blockNumber);
+                       } catch (IOException e) {
+                               fail(new 
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+                               return null;
+                       }
+                       PersistentChosenBlock block = new 
PersistentChosenBlock(false, request, item, null, null, sched);
+                       if(logMINOR) Logger.minor(this, "Created block 
"+block+" for block number "+blockNumber+" on "+this);
+                       blocks.add(block);
+               }
+               blocks.trimToSize();
+               if(persistent) {
+                       container.deactivate(blocks, 1);
+               }
+               return blocks;
+       }
+
+       @Override
+       public synchronized SendableRequestItem[] sendableKeys(ObjectContainer 
container, ClientContext context) {
+               if(persistent) {
+                       container.activate(blocks, 1);
+               }
+               SendableRequestItem[] items = new 
SendableRequestItem[blocks.size()];
+               for(int i=0;i<blocks.size();i++)
+                       try {
+                               items[i] = getBlockItem(container, context, 
blocks.get(i));
+                       } catch (IOException e) {
+                               fail(new 
InsertException(InsertException.BUCKET_ERROR, e, null), container, context);
+                               return null;
+                       }
+               if(persistent) {
+                       container.deactivate(blocks, 1);
+               }
+               return items;
+       }
+
+       public synchronized boolean isEmpty(ObjectContainer container) {
+               return (finished || blocks.isEmpty());
+       }
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to