Author: toad
Date: 2006-08-04 21:14:16 +0000 (Fri, 04 Aug 2006)
New Revision: 9889

Added:
   trunk/freenet/src/freenet/client/async/HealingQueue.java
   trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java
Modified:
   trunk/freenet/src/freenet/client/FetcherContext.java
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
   trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   trunk/freenet/src/freenet/client/async/USKInserter.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/Version.java
Log:
925: Implement transparent splitfile healing. Backlog can grow up to 16MB, and 
runs at prefetch priority.

Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java        2006-08-04 
18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/FetcherContext.java        2006-08-04 
21:14:16 UTC (rev 9889)
@@ -1,5 +1,6 @@
 package freenet.client;

+import freenet.client.async.HealingQueue;
 import freenet.client.async.USKManager;
 import freenet.client.events.ClientEventProducer;
 import freenet.client.events.SimpleEventProducer;
@@ -43,6 +44,7 @@
        /** If true, and we get a ZIP manifest, and we have no meta-strings 
left, then
         * return the manifest contents as data. */
        public boolean returnZIPManifests;
+       public final HealingQueue healingQueue;

        public FetcherContext(long curMaxLength, 
                        long curMaxTempLength, int maxMetadataSize, int 
maxRecursionLevel, int maxArchiveRestarts, int maxArchiveLevels,
@@ -51,7 +53,7 @@
                        boolean allowSplitfiles, boolean followRedirects, 
boolean localRequestOnly,
                        int maxDataBlocksPerSegment, int 
maxCheckBlocksPerSegment,
                        RandomSource random, ArchiveManager archiveManager, 
BucketFactory bucketFactory,
-                       ClientEventProducer producer, boolean 
cacheLocalRequests, USKManager uskManager) {
+                       ClientEventProducer producer, boolean 
cacheLocalRequests, USKManager uskManager, HealingQueue hq) {
                this.maxOutputLength = curMaxLength;
                this.uskManager = uskManager;
                this.maxTempLength = curMaxTempLength;
@@ -74,9 +76,11 @@
                this.maxDataBlocksPerSegment = maxDataBlocksPerSegment;
                this.maxCheckBlocksPerSegment = maxCheckBlocksPerSegment;
                this.cacheLocalRequests = cacheLocalRequests;
+               this.healingQueue = hq;
        }

        public FetcherContext(FetcherContext ctx, int maskID, boolean 
keepProducer) {
+               this.healingQueue = ctx.healingQueue;
                if(keepProducer)
                        this.eventProducer = ctx.eventProducer;
                else

Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2006-08-04 21:14:16 UTC (rev 9889)
@@ -5,6 +5,7 @@

 import freenet.client.async.ClientGetter;
 import freenet.client.async.ClientPutter;
+import freenet.client.async.HealingQueue;
 import freenet.client.async.SimpleManifestPutter;
 import freenet.client.events.ClientEventListener;
 import freenet.client.events.ClientEventProducer;
@@ -34,6 +35,7 @@
        private long curMaxTempLength;
        private int curMaxMetadataLength;
        private final RandomSource random;
+       private final HealingQueue healingQueue;
        /** See comments in Node */
        private final boolean cacheLocalRequests;
        static final int MAX_RECURSION = 10;
@@ -83,6 +85,7 @@
                curMaxMetadataLength = 1024 * 1024;
                this.cacheLocalRequests = cacheLocalRequests;
                this.persistentBucketFactory = 
node.persistentEncryptedTempBucketFactory;
+               this.healingQueue = node.getHealingQueue();
        }

        public void setMaxLength(long maxLength) {
@@ -174,7 +177,7 @@
                                FETCH_SPLITFILES, FOLLOW_REDIRECTS, 
LOCAL_REQUESTS_ONLY,
                                MAX_SPLITFILE_BLOCKS_PER_SEGMENT, 
MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
                                random, archiveManager, bucketFactory, 
globalEventProducer, 
-                               cacheLocalRequests, node.uskManager);
+                               cacheLocalRequests, node.uskManager, 
healingQueue);
        }

        public InserterContext getInserterContext(boolean forceNonPersistent) {

Added: trunk/freenet/src/freenet/client/async/HealingQueue.java
===================================================================
--- trunk/freenet/src/freenet/client/async/HealingQueue.java    2006-08-04 
18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/HealingQueue.java    2006-08-04 
21:14:16 UTC (rev 9889)
@@ -0,0 +1,10 @@
+package freenet.client.async;
+
+import freenet.support.io.Bucket;
+
+public interface HealingQueue {
+
+       /** Queue a Bucket of data to insert as a CHK. */
+       void queue(Bucket data);
+
+}

Added: trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java      
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java      
2006-08-04 21:14:16 UTC (rev 9889)
@@ -0,0 +1,113 @@
+package freenet.client.async;
+
+import java.util.HashMap;
+
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.client.Metadata;
+import freenet.keys.BaseClientKey;
+import freenet.keys.CHKBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Logger;
+import freenet.support.io.Bucket;
+
+public class SimpleHealingQueue extends BaseClientPutter implements 
HealingQueue, PutCompletionCallback {
+
+       final int maxRunning;
+       int counter;
+       InserterContext ctx;
+       final HashMap runningInserters;
+       
+       public SimpleHealingQueue(ClientRequestScheduler scheduler, 
InserterContext context, short prio, int maxRunning) {
+               super(prio, scheduler, null, context);
+               this.ctx = context;
+               this.runningInserters = new HashMap();
+               this.maxRunning = maxRunning;
+       }
+
+       public boolean innerQueue(Bucket data) {
+               SingleBlockInserter sbi;
+               int ctr;
+               synchronized(this) {
+                       ctr = counter++;
+                       if(runningInserters.size() > maxRunning) return false;
+                       try {
+                               sbi = new SingleBlockInserter(this, data, 
(short)-1, 
+                                                       
FreenetURI.EMPTY_CHK_URI, ctx, this, false, 
+                                                       CHKBlock.DATA_LENGTH, 
ctr, false, false, false, data);
+                       } catch (Throwable e) {
+                               Logger.error(this, "Caught trying to insert 
healing block: "+e, e);
+                               return false;
+                       }
+                       runningInserters.put(data, sbi);
+               }
+               try {
+                       sbi.schedule();
+                       Logger.minor(this, "Started healing insert "+ctr+" for 
"+data);
+                       return true;
+               } catch (Throwable e) {
+                       Logger.error(this, "Caught trying to insert healing 
block: "+e, e);
+                       return false;
+               }
+       }
+
+       public void queue(Bucket data) {
+               if(!innerQueue(data))
+                       data.free();
+       }
+       
+       public void onMajorProgress() {
+               // Ignore
+       }
+
+       public FreenetURI getURI() {
+               return FreenetURI.EMPTY_CHK_URI;
+       }
+
+       public boolean isFinished() {
+               return false;
+       }
+
+       public void notifyClients() {
+               // Do nothing
+       }
+
+       public void onSuccess(ClientPutState state) {
+               SingleBlockInserter sbi = (SingleBlockInserter)state;
+               Bucket data = (Bucket) sbi.getToken();
+               synchronized(this) {
+                       runningInserters.remove(data);
+               }
+               Logger.minor(this, "Successfully inserted healing block: 
"+sbi.getURI()+" for "+data+" ("+sbi.token+")");
+               data.free();
+       }
+
+       public void onFailure(InserterException e, ClientPutState state) {
+               SingleBlockInserter sbi = (SingleBlockInserter)state;
+               Bucket data = (Bucket) sbi.getToken();
+               synchronized(this) {
+                       runningInserters.remove(data);
+               }
+               Logger.minor(this, "Failed to insert healing block: 
"+sbi.getURI()+" : "+e+" for "+data+" ("+sbi.token+")", e);
+               data.free();
+       }
+
+       public void onEncode(BaseClientKey usk, ClientPutState state) {
+               // Ignore
+       }
+
+       public void onTransition(ClientPutState oldState, ClientPutState 
newState) {
+               // Should never happen
+               Logger.error(this, "impossible: onTransition on 
SimpleHealingQueue from "+oldState+" to "+newState, new Exception("debug"));
+       }
+
+       public void onMetadata(Metadata m, ClientPutState state) {
+               // Should never happen
+               Logger.error(this, "Got metadata on SimpleHealingQueue from 
"+state+": "+m, new Exception("debug"));
+       }
+
+       public void onBlockSetFinished(ClientPutState state) {
+               // Ignore
+       }
+
+}

Modified: trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleBlockInserter.java     
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/SingleBlockInserter.java     
2006-08-04 21:14:16 UTC (rev 9889)
@@ -44,7 +44,7 @@
        final int sourceLength;
        private int consecutiveRNFs;

-       public SingleBlockInserter(BaseClientPutter parent, Bucket data, short 
compressionCodec, FreenetURI uri, InserterContext ctx, PutCompletionCallback 
cb, boolean isMetadata, int sourceLength, int token, boolean getCHKOnly, 
boolean addToParent, boolean dontSendEncoded, Object tokenObject) throws 
InserterException {
+       public SingleBlockInserter(BaseClientPutter parent, Bucket data, short 
compressionCodec, FreenetURI uri, InserterContext ctx, PutCompletionCallback 
cb, boolean isMetadata, int sourceLength, int token, boolean getCHKOnly, 
boolean addToParent, boolean dontSendEncoded, Object tokenObject) {
                this.consecutiveRNFs = 0;
                this.tokenObject = tokenObject;
                this.token = token;

Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2006-08-04 21:14:16 UTC (rev 9889)
@@ -186,6 +186,13 @@
                        // Now decode
                        Logger.minor(this, "Decoding "+this);

+                       boolean[] dataBlocksSucceeded = new 
boolean[dataBuckets.length];
+                       boolean[] checkBlocksSucceeded = new 
boolean[checkBuckets.length];
+                       for(int i=0;i<dataBuckets.length;i++)
+                               dataBlocksSucceeded[i] = dataBuckets[i].data != 
null;
+                       for(int i=0;i<checkBuckets.length;i++)
+                               checkBlocksSucceeded[i] = checkBuckets[i].data 
!= null;
+                       
                        FECCodec codec = FECCodec.getCodec(splitfileType, 
dataBlocks.length, checkBlocks.length);
                        try {
                                if(splitfileType != 
Metadata.SPLITFILE_NONREDUNDANT) {
@@ -218,51 +225,49 @@

                        // Now heal

-//                     // Encode any check blocks we don't have
-//                     if(codec != null) {
-//                             try {
-//                                     codec.encode(dataBuckets, checkBuckets, 
32768, fetcherContext.bucketFactory);
-//                             } catch (IOException e) {
-//                                     Logger.error(this, "Bucket error while 
healing: "+e, e);
-//                             }
-//                     }
-//                     
-//                     // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
-//                     for(int i=0;i<dataBlockStatus.length;i++) {
-//                             if(dataBuckets[i].getData() != null) continue;
-//                             SingleFileFetcher fetcher = dataBlockStatus[i];
-//                             if(fetcher.getRetryCount() == 0) {
-//                                     // 80% chance of not inserting, if we 
never tried it
-//                                     if(fetcherContext.random.nextInt(5) == 
0) continue;
-//                             }
-//                             queueHeal(dataBuckets[i].getData());
-//                     }
-//                     for(int i=0;i<checkBlockStatus.length;i++) {
-//                             if(checkBuckets[i].getData() != null) continue;
-//                             SingleFileFetcher fetcher = checkBlockStatus[i];
-//                             if(fetcher.getRetryCount() == 0) {
-//                                     // 80% chance of not inserting, if we 
never tried it
-//                                     if(fetcherContext.random.nextInt(5) == 
0) continue;
-//                             }
-//                             queueHeal(checkBuckets[i].getData());
-//                     }
+                       /** Splitfile healing:
+                        * Any block which we have tried and failed to download 
should be 
+                        * reconstructed and reinserted.
+                        */

-                       for(int i=0;i<dataBlocks.length;i++) {
-                               MinimalSplitfileBlock b = dataBuckets[i];
-                               if(b != null) {
-                                       Bucket d = b.getData();
-                                       if(d != null) d.free();
+                       // Encode any check blocks we don't have
+                       if(codec != null) {
+                               try {
+                                       codec.encode(dataBuckets, checkBuckets, 
32768, fetcherContext.bucketFactory);
+                               } catch (IOException e) {
+                                       Logger.error(this, "Bucket error while 
healing: "+e, e);
                                }
+                       }
+                       
+                       // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
+                       for(int i=0;i<dataBlockStatus.length;i++) {
+                               boolean heal = false;
+                               if(!dataBlocksSucceeded[i]) {
+                                       SingleFileFetcher fetcher = 
dataBlockStatus[i];
+                                       if(fetcher.getRetryCount() > 0)
+                                               heal = true;
+                               }
+                               if(heal) {
+                                       queueHeal(dataBuckets[i].getData());
+                               } else {
+                                       dataBuckets[i].data.free();
+                               }
                                dataBuckets[i] = null;
                                dataBlockStatus[i] = null;
                                dataBlocks[i] = null;
                        }
-                       for(int i=0;i<checkBlocks.length;i++) {
-                               MinimalSplitfileBlock b = checkBuckets[i];
-                               if(b != null) {
-                                       Bucket d = b.getData();
-                                       if(d != null) d.free();
+                       for(int i=0;i<checkBlockStatus.length;i++) {
+                               boolean heal = false;
+                               if(!checkBlocksSucceeded[i]) {
+                                       SingleFileFetcher fetcher = 
checkBlockStatus[i];
+                                       if(fetcher.getRetryCount() > 0)
+                                               heal = true;
                                }
+                               if(heal) {
+                                       queueHeal(checkBuckets[i].getData());
+                               } else {
+                                       checkBuckets[i].data.free();
+                               }
                                checkBuckets[i] = null;
                                checkBlockStatus[i] = null;
                                checkBlocks[i] = null;
@@ -272,8 +277,8 @@
        }

        private void queueHeal(Bucket data) {
-               // TODO Auto-generated method stub
-               
+               Logger.minor(this, "Queueing healing insert");
+               fetcherContext.healingQueue.queue(data);
        }

        /** This is after any retries and therefore is either out-of-retries or 
fatal */

Modified: trunk/freenet/src/freenet/client/async/USKInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKInserter.java     2006-08-04 
18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/USKInserter.java     2006-08-04 
21:14:16 UTC (rev 9889)
@@ -104,14 +104,9 @@
                if(finished) return;
                edition = edNo;
                Logger.minor(this, "scheduling insert for "+pubUSK.getURI()+" 
"+edition);
+               sbi = new SingleBlockInserter(parent, data, compressionCodec, 
privUSK.getInsertableSSK(edition).getInsertURI(),
+                               ctx, this, isMetadata, sourceLength, token, 
getCHKOnly, false, true /* we don't use it */, tokenObject);
                try {
-                       sbi = new SingleBlockInserter(parent, data, 
compressionCodec, privUSK.getInsertableSSK(edition).getInsertURI(),
-                                       ctx, this, isMetadata, sourceLength, 
token, getCHKOnly, false, true /* we don't use it */, tokenObject);
-               } catch (InserterException e) {
-                       cb.onFailure(e, this);
-                       return;
-               }
-               try {
                        sbi.schedule();
                } catch (InserterException e) {
                        cb.onFailure(e, this);

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-08-04 18:05:05 UTC (rev 
9888)
+++ trunk/freenet/src/freenet/node/Node.java    2006-08-04 21:14:16 UTC (rev 
9889)
@@ -39,13 +39,17 @@
 import freenet.client.FetcherContext;
 import freenet.client.HighLevelSimpleClient;
 import freenet.client.HighLevelSimpleClientImpl;
+import freenet.client.InserterContext;
 import freenet.client.InserterException;
 import freenet.client.async.BaseClientPutter;
 import freenet.client.async.ClientCallback;
 import freenet.client.async.ClientGetter;
 import freenet.client.async.ClientPutter;
 import freenet.client.async.ClientRequestScheduler;
+import freenet.client.async.HealingQueue;
+import freenet.client.async.SimpleHealingQueue;
 import freenet.client.async.USKManager;
+import freenet.client.events.SimpleEventProducer;
 import freenet.clients.http.BookmarkManager;
 import freenet.clients.http.FProxyToadlet;
 import freenet.clients.http.SimpleToadletServer;
@@ -665,6 +669,7 @@
        final RequestStarter sskInsertStarter;
        public final UserAlertManager alerts;
        final TimeDecayingRunningAverage throttledPacketSendAverage;
+       private final HealingQueue healingQueue;
        /** Must be included as a hidden field in order for any dangerous HTTP 
operation to complete successfully. */
        public final String formPassword;
        final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
@@ -1626,6 +1631,11 @@
                System.out.println("Initializing Plugin Manager");
                pluginManager = new PluginManager(this);

+               healingQueue = new SimpleHealingQueue(chkPutScheduler,
+                               new InserterContext(tempBucketFactory, 
tempBucketFactory, persistentTempBucketFactory, 
+                                               random, 0, 2, 1, 0, 0, new 
SimpleEventProducer(), 
+                                               false, uskManager), 
RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make configurable */);
+               
                FetcherContext ctx = makeClient((short)0).getFetcherContext();

                ctx.allowSplitfiles = false;
@@ -3631,4 +3641,8 @@
                redetectAddress();
                shouldInsertARK();
        }
+
+       public HealingQueue getHealingQueue() {
+               return healingQueue;
+       }
 }

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-08-04 18:05:05 UTC (rev 
9888)
+++ trunk/freenet/src/freenet/node/Version.java 2006-08-04 21:14:16 UTC (rev 
9889)
@@ -18,7 +18,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 924;
+       private static final int buildNumber = 925;

        /** Oldest build of Fred we will talk to */
        private static final int oldLastGoodBuild = 874;


Reply via email to