Author: toad
Date: 2008-06-24 21:07:10 +0000 (Tue, 24 Jun 2008)
New Revision: 20663

Modified:
   branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
Log:
More activation and updating work for fetches.

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-06-24 20:52:49 UTC (rev 20662)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-06-24 21:07:10 UTC (rev 20663)
@@ -157,6 +157,8 @@
        }

        public void onSuccess(Bucket data, int blockNo, 
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer 
container, RequestScheduler sched) {
+               if(persistent)
+                       container.activate(this, 1);
                boolean decodeNow = false;
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on 
"+seg);
@@ -173,6 +175,8 @@
                                }
                                dataRetries[blockNo] = 0; // Prevent healing of 
successfully fetched block.
                                dataKeys[blockNo] = null;
+                               if(persistent)
+                                       
container.activate(dataBuckets[blockNo], 1);
                                dataBuckets[blockNo].setData(data);
                        } else if(blockNo < checkKeys.length + dataKeys.length) 
{
                                blockNo -= dataKeys.length;
@@ -183,6 +187,8 @@
                                }
                                checkRetries[blockNo] = 0; // Prevent healing 
of successfully fetched block.
                                checkKeys[blockNo] = null;
+                               if(persistent)
+                                       
container.activate(checkBuckets[blockNo], 1);
                                checkBuckets[blockNo].setData(data);
                        } else
                                Logger.error(this, "Unrecognized block number: 
"+blockNo, new Exception("error"));
@@ -199,6 +205,8 @@
                        }
                        dontNotify = !scheduled;
                }
+               if(persistent)
+                       container.set(this);
                parentFetcher.parent.completedBlock(dontNotify, container, 
sched.getContext());
                seg.possiblyRemoveFromParent(container);
                if(decodeNow) {
@@ -208,10 +216,14 @@
        }

        public void decode(ObjectContainer container, ClientContext context, 
RequestScheduler sched) {
+               if(persistent)
+                       container.activate(this, 1);
                // Now decode
                if(logMINOR) Logger.minor(this, "Decoding 
"+SplitFileFetcherSegment.this);

                codec = FECCodec.getCodec(splitfileType, dataKeys.length, 
checkKeys.length, sched.getContext().mainExecutor);
+               if(persistent)
+                       container.set(this);

                if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
                        FECQueue queue = sched.getFECQueue();
@@ -222,6 +234,8 @@
        }

        public void onDecodedSegment(ObjectContainer container, ClientContext 
context, FECJob job, Bucket[] dataBuckets2, Bucket[] checkBuckets2, 
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {
+               if(persistent)
+                       container.activate(this, 1);
                // Because we use SplitfileBlock, we DON'T have to copy here.
                // See FECCallback comments for explanation.
                try {
@@ -241,7 +255,9 @@
                        OutputStream os = decodedData.getOutputStream();
                        for(int i=0;i<dataBuckets.length;i++) {
                                SplitfileBlock status = dataBuckets[i];
+                               if(persistent) container.activate(status, 1);
                                Bucket data = status.getData();
+                               if(persistent) container.activate(data, 1);
                                BucketTools.copyTo(data, os, Long.MAX_VALUE);
                        }
                        if(logMINOR) Logger.minor(this, "Copied data");
@@ -251,12 +267,14 @@
                        finished = true;
                        if(codec == null || !isCollectingBinaryBlob())
                                
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container, context);
+                       if(persistent) container.set(this);
                } catch (IOException e) {
                        Logger.normal(this, "Caught bucket error?: "+e, e);
                        synchronized(this) {
                                finished = true;
                                failureException = new 
FetchException(FetchException.BUCKET_ERROR);
                        }
+                       if(persistent) container.set(this);
                        
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container, context);
                        return;
                }
@@ -276,13 +294,19 @@
        }

        public void onEncodedSegment(ObjectContainer container, ClientContext 
context, FECJob job, Bucket[] dataBuckets2, Bucket[] checkBuckets2, 
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {
+               if(persistent)
+                       container.activate(this, 1);
                // Because we use SplitfileBlock, we DON'T have to copy here.
                // See FECCallback comments for explanation.
                synchronized(this) {
                        // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
                        for(int i=0;i<dataBuckets.length;i++) {
                                boolean heal = false;
+                               if(persistent)
+                                       container.activate(dataBuckets[i], 1);
                                Bucket data = dataBuckets[i].getData();
+                               if(persistent)
+                                       container.activate(data, 1);
                                if(dataRetries[i] > 0)
                                        heal = true;
                                if(heal) {
@@ -296,7 +320,11 @@
                        }
                        for(int i=0;i<checkBuckets.length;i++) {
                                boolean heal = false;
+                               if(persistent)
+                                       container.activate(checkBuckets[i], 1);
                                Bucket data = checkBuckets[i].getData();
+                               if(persistent)
+                                       container.activate(data, 1);
                                try {
                                        maybeAddToBinaryBlob(data, i, true, 
container, context);
                                } catch (FetchException e) {
@@ -314,6 +342,8 @@
                                checkKeys[i] = null;
                        }
                }
+               if(persistent)
+                       container.set(this);
                // Defer the completion until we have generated healing blocks 
if we are collecting binary blobs.
                if(isCollectingBinaryBlob())
                        
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container, context);
@@ -352,6 +382,8 @@
        /** This is after any retries and therefore is either out-of-retries or 
fatal 
         * @param container */
        public synchronized void onFatalFailure(FetchException e, int blockNo, 
SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext 
context) {
+               if(persistent)
+                       container.activate(this, 1);
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                if(logMINOR) Logger.minor(this, "Permanently failed block: 
"+blockNo+" on "+this+" : "+e, e);
                boolean allFailed;
@@ -385,6 +417,8 @@
                        // Once it is no longer possible to have a successful 
fetch, fail...
                        allFailed = failedBlocks + fatallyFailedBlocks > 
(dataKeys.length + checkKeys.length - minFetched);
                }
+               if(persistent)
+                       container.set(this);
                if(allFailed)
                        fail(new FetchException(FetchException.SPLITFILE_ERROR, 
errors), container, context);
                else
@@ -394,6 +428,8 @@
        /** A request has failed non-fatally, so the block may be retried 
         * @param container */
        public void onNonFatalFailure(FetchException e, int blockNo, 
SplitFileFetcherSubSegment seg, RequestScheduler sched, ObjectContainer 
container) {
+               if(persistent)
+                       container.activate(this, 1);
                ClientContext context = sched.getContext();
                int tries;
                int maxTries = blockFetchContext.maxNonSplitfileRetries;
@@ -405,6 +441,8 @@
                        if(isFinished()) return;
                        if(blockNo < dataKeys.length) {
                                key = dataKeys[blockNo];
+                               if(persistent)
+                                       container.activate(key, 5);
                                tries = ++dataRetries[blockNo];
                                if(tries > maxTries && maxTries >= 0) failed = 
true;
                                else {
@@ -421,6 +459,8 @@
                        } else {
                                int checkNo = blockNo - dataKeys.length;
                                key = checkKeys[checkNo];
+                               if(persistent)
+                                       container.activate(key, 5);
                                tries = ++checkRetries[checkNo];
                                if(tries > maxTries && maxTries >= 0) failed = 
true;
                                else {
@@ -436,6 +476,8 @@
                                }
                        }
                }
+               if(persistent)
+                       container.set(this);
                if(failed) {
                        onFatalFailure(e, blockNo, seg, container, context);
                        if(logMINOR)
@@ -496,12 +538,21 @@
                        }
                }
                removeSubSegments(container);
+               if(persistent)
+                       container.set(this);
                parentFetcher.segmentFinished(this, container, context);
        }

        public void schedule(ObjectContainer container, ClientContext context) {
+               if(persistent) {
+                       container.activate(this, 1);
+                       container.activate(parentFetcher, 1);
+                       container.activate(parentFetcher.parent, 1);
+               }
                try {
                        SplitFileFetcherSubSegment seg = getSubSegment(0);
+                       if(persistent)
+                               container.activate(seg, 1);
                        for(int 
i=0;i<dataRetries.length+checkRetries.length;i++)
                                seg.add(i, true, container, context);

@@ -509,6 +560,8 @@
                        synchronized(this) {
                                scheduled = true;
                        }
+                       if(persistent)
+                               container.set(this);
                        parentFetcher.parent.notifyClients(container, context);
                        if(logMINOR)
                                Logger.minor(this, "scheduling "+seg+" : 
"+seg.blockNums);
@@ -585,6 +638,8 @@
                        deadSegs = (SplitFileFetcherSubSegment[]) 
subSegments.toArray(new SplitFileFetcherSubSegment[subSegments.size()]);
                        subSegments.clear();
                }
+               if(persistent && deadSegs.length > 0)
+                       container.set(this);
                for(int i=0;i<deadSegs.length;i++) {
                        deadSegs[i].kill(container);
                }
@@ -598,6 +653,8 @@
        }

        public void requeueAfterCooldown(Key key, long time, ObjectContainer 
container, ClientContext context) {
+               if(persistent)
+                       container.activate(this, 1);
                Vector v = null;
                boolean notFound = true;
                synchronized(this) {
@@ -707,6 +764,8 @@
                        }
                        finished = true;
                }
+               if(persistent)
+                       container.activate(this, 1);
                this.fail(new FetchException(FetchException.INTERNAL_ERROR, 
"FEC failure: "+t, t), container, context);
        }
 }


Reply via email to