Author: toad
Date: 2008-07-21 18:56:13 +0000 (Mon, 21 Jul 2008)
New Revision: 21292

Added:
   branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java
   branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java
Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
Log:
Call the non-fatal failure callback for many items at once from the same 
subsegment in a single transaction.
Should be a significant optimisation.

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-07-21 18:28:15 UTC (rev 21291)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-07-21 18:56:13 UTC (rev 21292)
@@ -3,6 +3,7 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.client.async;

+import java.util.HashMap;
 import java.util.LinkedList;

 import com.db4o.ObjectContainer;
@@ -16,6 +17,7 @@
 import freenet.keys.Key;
 import freenet.keys.KeyBlock;
 import freenet.node.BaseSendableGet;
+import freenet.node.BulkCallFailureItem;
 import freenet.node.KeysFetchingLocally;
 import freenet.node.LowLevelGetException;
 import freenet.node.LowLevelPutException;
@@ -26,6 +28,7 @@
 import freenet.node.SendableGet;
 import freenet.node.SendableInsert;
 import freenet.node.SendableRequest;
+import freenet.node.SupportsBulkCallFailure;
 import freenet.support.Logger;
 import freenet.support.PrioritizedSerialExecutor;
 import freenet.support.api.StringCallback;
@@ -923,11 +926,60 @@
                schedCore.removeChosenRequest(req);
        }

+       /**
+        * Map from SendableGet implementing SupportsBulkCallFailure to 
BulkCallFailureItem[].
+        */
+       private transient HashMap bulkFailureLookup = new HashMap();
+       
+       private class BulkCaller implements DBJob {
+               
+               private final SupportsBulkCallFailure getter;
+               
+               BulkCaller(SupportsBulkCallFailure getter) {
+                       this.getter = getter;
+               }
+
+               public void run(ObjectContainer container, ClientContext 
context) {
+                       BulkCallFailureItem[] items;
+                       synchronized(ClientRequestScheduler.this) {
+                               items = (BulkCallFailureItem[]) 
bulkFailureLookup.get(getter);
+                               bulkFailureLookup.remove(getter);
+                       }
+                       if(items != null && items.length > 0) {
+                               if(logMINOR) Logger.minor(this, "Calling 
non-fatal failure in bulk for "+items.length+" items");
+                               getter.onFailure(items, container, context);
+                       } else
+                               Logger.normal(this, "Calling non-fatal failure 
in bulk for "+getter+" but no items to run");
+               }
+               
+               public boolean equals(Object o) {
+                       if(o instanceof BulkCaller) {
+                               return ((BulkCaller)o).getter == getter;
+                       } else return false;
+               }
+       }
+       
        public void callFailure(final SendableGet get, final 
LowLevelGetException e, final Object keyNum, int prio, final ChosenRequest req, 
boolean persistent) {
                if(!persistent) {
                        get.onFailure(e, keyNum, null, clientContext);
                        return;
                }
+               if(get instanceof SupportsBulkCallFailure) {
+                       SupportsBulkCallFailure getter = 
(SupportsBulkCallFailure) get;
+                       BulkCallFailureItem item = new BulkCallFailureItem(e, 
keyNum);
+                       synchronized(this) {
+                               BulkCallFailureItem[] items = 
(BulkCallFailureItem[]) bulkFailureLookup.get(get);
+                               if(items == null) {
+                                       bulkFailureLookup.put(getter, new 
BulkCallFailureItem[] { item } );
+                               } else {
+                                       BulkCallFailureItem[] newItems = new 
BulkCallFailureItem[items.length+1];
+                                       System.arraycopy(items, 0, newItems, 0, 
items.length);
+                                       newItems[items.length] = item;
+                                       bulkFailureLookup.put(getter, newItems);
+                               }
+                       }
+                       jobRunner.queue(new BulkCaller(getter), prio, true);
+               }
                jobRunner.queue(new DBJob() {

                        public void run(ObjectContainer container, 
ClientContext context) {

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-07-21 18:28:15 UTC (rev 21291)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-07-21 18:56:13 UTC (rev 21292)
@@ -547,15 +547,38 @@
                if(persistent) {
                        container.activate(blockFetchContext, 1);
                }
+               int maxTries = blockFetchContext.maxNonSplitfileRetries;
                RequestScheduler sched = context.getFetchScheduler(false);
+               boolean set = onNonFatalFailure(e, blockNo, seg, container, 
context, sched, maxTries);
+               if(persistent && set)
+                       container.set(this);
+       }
+       
+       public void onNonFatalFailure(FetchException[] failures, int[] 
blockNos, SplitFileFetcherSubSegment seg, ObjectContainer container, 
ClientContext context) {
+               if(persistent) {
+                       container.activate(blockFetchContext, 1);
+               }
+               int maxTries = blockFetchContext.maxNonSplitfileRetries;
+               RequestScheduler sched = context.getFetchScheduler(false);
+               boolean set = false;
+               for(int i=0;i<failures.length;i++)
+                       if(onNonFatalFailure(failures[i], blockNos[i], seg, 
container, context, sched, maxTries))
+                               set = true;
+               if(persistent && set)
+                       container.set(this);
+       }
+       
+       /**
+        * Caller must set(this) iff returns true.
+        */
+       private boolean onNonFatalFailure(FetchException e, int blockNo, 
SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext 
context, RequestScheduler sched, int maxTries) {
                int tries;
-               int maxTries = blockFetchContext.maxNonSplitfileRetries;
                boolean failed = false;
                boolean cooldown = false;
                ClientCHK key;
                SplitFileFetcherSubSegment sub = null;
                synchronized(this) {
-                       if(isFinished(container)) return;
+                       if(isFinished(container)) return false;
                        if(blockNo < dataKeys.length) {
                                key = dataKeys[blockNo];
                                if(persistent)
@@ -596,13 +619,11 @@
                if(tries != seg.retryCount+1) {
                        Logger.error(this, "Failed on segment "+seg+" but tries 
for block (after increment) is "+tries);
                }
-               if(persistent)
-                       container.set(this);
                if(failed) {
                        onFatalFailure(e, blockNo, seg, container, context);
                        if(logMINOR)
                                Logger.minor(this, "Not retrying block 
"+blockNo+" on "+this+" : tries="+tries+"/"+maxTries);
-                       return;
+                       return false;
                }
                if(cooldown) {
                        // Registered to cooldown queue
@@ -618,6 +639,7 @@
                        if(sub != null && sub != seg) container.deactivate(sub, 
1);
                        container.deactivate(key, 5);
                }
+               return true;
        }

        private SplitFileFetcherSubSegment getSubSegment(int retryCount, 
ObjectContainer container, boolean noCreate) {

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-07-21 18:28:15 UTC (rev 21291)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-07-21 18:56:13 UTC (rev 21292)
@@ -17,10 +17,12 @@
 import freenet.keys.KeyBlock;
 import freenet.keys.KeyDecodeException;
 import freenet.keys.TooBigException;
+import freenet.node.BulkCallFailureItem;
 import freenet.node.KeysFetchingLocally;
 import freenet.node.LowLevelGetException;
 import freenet.node.RequestClient;
 import freenet.node.SendableGet;
+import freenet.node.SupportsBulkCallFailure;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;

@@ -33,7 +35,7 @@
  * LOCKING: Synchronize on the parent segment. Nothing else makes sense w.r.t. 
nested locking.
  * Note that SendableRequest will occasionally lock on (this). That lock is 
always taken last.
  */
-public class SplitFileFetcherSubSegment extends SendableGet {
+public class SplitFileFetcherSubSegment extends SendableGet implements 
SupportsBulkCallFailure {

        final int retryCount;
        final SplitFileFetcherSegment segment;
@@ -222,49 +224,92 @@
                return ctx.ignoreStore;
        }

-       // Translate it, then call the real onFailure
+       // SendableGet has a hashCode() and inherits equals(), which is 
consistent with the hashCode().
+       
+       public void onFailure(BulkCallFailureItem[] items, ObjectContainer 
container, ClientContext context) {
+               FetchException[] fetchExceptions = new 
FetchException[items.length];
+               int countFatal = 0;
+               for(int i=0;i<items.length;i++) {
+                       fetchExceptions[i] = translateException(items[i].e);
+                       if(fetchExceptions[i].isFatal()) countFatal++;
+               }
+               if(persistent) {
+                       container.activate(segment, 1);
+                       container.activate(parent, 1);
+                       container.activate(segment.errors, 1);
+               }
+               if(parent.isCancelled()) {
+                       if(Logger.shouldLog(Logger.MINOR, this)) 
+                               Logger.minor(this, "Failing: cancelled");
+                       // Fail the segment.
+                       segment.fail(new 
FetchException(FetchException.CANCELLED), container, context, false);
+                       // FIXME do we need to free the keyNum's??? Or will 
that happen later anyway?
+                       return;
+               }
+               for(int i=0;i<fetchExceptions.length;i++)
+                       segment.errors.inc(fetchExceptions[i].getMode());
+               int nonFatalExceptions = items.length - countFatal;
+               int[] blockNumbers = new int[nonFatalExceptions];
+               if(countFatal > 0) {
+                       FetchException[] newFetchExceptions = new 
FetchException[items.length - countFatal];
+                       // Call the fatal callbacks directly.
+                       int x = 0;
+                       for(int i=0;i<items.length;i++) {
+                               int blockNum = 
((Integer)items[i].token).intValue();
+                               if(fetchExceptions[i].isFatal()) {
+                                       
segment.onFatalFailure(fetchExceptions[i], blockNum, this, container, context);
+                               } else {
+                                       blockNumbers[x] = blockNum;
+                                       newFetchExceptions[x] = 
fetchExceptions[i];
+                                       x++;
+                               }
+                       }
+                       fetchExceptions = newFetchExceptions;
+               } else {
+                       for(int i=0;i<blockNumbers.length;i++)
+                               blockNumbers[i] = 
((Integer)items[i].token).intValue();
+               }
+               segment.onNonFatalFailure(fetchExceptions, blockNumbers, this, 
container, context);
+
+               // TODO Auto-generated method stub
+               
+       }
+       
        // FIXME refactor this out to a common method; see 
SimpleSingleFileFetcher
-       public void onFailure(LowLevelGetException e, Object token, 
ObjectContainer container, ClientContext context) {
-               if(logMINOR)
-                       Logger.minor(this, "onFailure("+e+" , "+token+" on 
"+this);
+       private FetchException translateException(LowLevelGetException e) {
                switch(e.code) {
                case LowLevelGetException.DATA_NOT_FOUND:
-                       onFailure(new 
FetchException(FetchException.DATA_NOT_FOUND), token, container, context);
-                       return;
                case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
-                       onFailure(new 
FetchException(FetchException.DATA_NOT_FOUND), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.DATA_NOT_FOUND);
                case LowLevelGetException.RECENTLY_FAILED:
-                       onFailure(new 
FetchException(FetchException.RECENTLY_FAILED), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.RECENTLY_FAILED);
                case LowLevelGetException.DECODE_FAILED:
-                       onFailure(new 
FetchException(FetchException.BLOCK_DECODE_ERROR), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.BLOCK_DECODE_ERROR);
                case LowLevelGetException.INTERNAL_ERROR:
-                       onFailure(new 
FetchException(FetchException.INTERNAL_ERROR), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.INTERNAL_ERROR);
                case LowLevelGetException.REJECTED_OVERLOAD:
-                       onFailure(new 
FetchException(FetchException.REJECTED_OVERLOAD), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.REJECTED_OVERLOAD);
                case LowLevelGetException.ROUTE_NOT_FOUND:
-                       onFailure(new 
FetchException(FetchException.ROUTE_NOT_FOUND), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.ROUTE_NOT_FOUND);
                case LowLevelGetException.TRANSFER_FAILED:
-                       onFailure(new 
FetchException(FetchException.TRANSFER_FAILED), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.TRANSFER_FAILED);
                case LowLevelGetException.VERIFY_FAILED:
-                       onFailure(new 
FetchException(FetchException.BLOCK_DECODE_ERROR), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.BLOCK_DECODE_ERROR);
                case LowLevelGetException.CANCELLED:
-                       onFailure(new FetchException(FetchException.CANCELLED), 
token, container, context);
-                       return;
+                       return new FetchException(FetchException.CANCELLED);
                default:
                        Logger.error(this, "Unknown LowLevelGetException code: 
"+e.code);
-                       onFailure(new 
FetchException(FetchException.INTERNAL_ERROR), token, container, context);
-                       return;
+                       return new 
FetchException(FetchException.INTERNAL_ERROR, "Unknown error code: "+e.code);
                }
        }

+       // Translate it, then call the real onFailure
+       public void onFailure(LowLevelGetException e, Object token, 
ObjectContainer container, ClientContext context) {
+               if(logMINOR)
+                       Logger.minor(this, "onFailure("+e+" , "+token+" on 
"+this);
+               onFailure(translateException(e), token, container, context);
+       }
+
        // Real onFailure
        protected void onFailure(FetchException e, Object token, 
ObjectContainer container, ClientContext context) {
                if(persistent) {

Added: branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java             
                (rev 0)
+++ branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java     
2008-07-21 18:56:13 UTC (rev 21292)
@@ -0,0 +1,13 @@
+package freenet.node;
+
+public class BulkCallFailureItem {
+       
+       public final LowLevelGetException e;
+       public final Object token;
+       
+       public BulkCallFailureItem(LowLevelGetException e, Object token) {
+               this.e = e;
+               this.token = token;
+       }
+
+}

Added: branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java         
                (rev 0)
+++ branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java 
2008-07-21 18:56:13 UTC (rev 21292)
@@ -0,0 +1,16 @@
+package freenet.node;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
+
+/**
+ * Normally only implemented by SendableGet's. YOU MUST ALSO IMPLEMENT 
equals() and a hashCode() consistent with it!
+ * @author toad
+ *
+ */
+public interface SupportsBulkCallFailure {
+       
+       /** Process a whole batch of failures at once. */
+       public abstract void onFailure(BulkCallFailureItem[] items, 
ObjectContainer container, ClientContext context);
+}


Reply via email to