Author: toad Date: 2008-07-21 18:56:13 +0000 (Mon, 21 Jul 2008) New Revision: 21292
Added: branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java Log: Call the non-fatal failure callback for many items at once from the same subsegment in a single transaction. Should be a significant optimisation. Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-07-21 18:28:15 UTC (rev 21291) +++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-07-21 18:56:13 UTC (rev 21292) @@ -3,6 +3,7 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.client.async; +import java.util.HashMap; import java.util.LinkedList; import com.db4o.ObjectContainer; @@ -16,6 +17,7 @@ import freenet.keys.Key; import freenet.keys.KeyBlock; import freenet.node.BaseSendableGet; +import freenet.node.BulkCallFailureItem; import freenet.node.KeysFetchingLocally; import freenet.node.LowLevelGetException; import freenet.node.LowLevelPutException; @@ -26,6 +28,7 @@ import freenet.node.SendableGet; import freenet.node.SendableInsert; import freenet.node.SendableRequest; +import freenet.node.SupportsBulkCallFailure; import freenet.support.Logger; import freenet.support.PrioritizedSerialExecutor; import freenet.support.api.StringCallback; @@ -923,11 +926,60 @@ schedCore.removeChosenRequest(req); } + /** + * Map from SendableGet implementing SupportsBulkCallFailure to BulkCallFailureItem[]. + */ + private transient HashMap bulkFailureLookup = new HashMap(); + + private class BulkCaller implements DBJob { + + private final SupportsBulkCallFailure getter; + + BulkCaller(SupportsBulkCallFailure getter) { + this.getter = getter; + } + + public void run(ObjectContainer container, ClientContext context) { + BulkCallFailureItem[] items; + synchronized(ClientRequestScheduler.this) { + items = (BulkCallFailureItem[]) bulkFailureLookup.get(getter); + bulkFailureLookup.remove(getter); + } + if(items != null && items.length > 0) { + if(logMINOR) Logger.minor(this, "Calling non-fatal failure in bulk for "+items.length+" items"); + getter.onFailure(items, container, context); + } else + Logger.normal(this, "Calling non-fatal failure in bulk for "+getter+" but no items to run"); + } + + public boolean equals(Object o) { + if(o instanceof BulkCaller) { + return ((BulkCaller)o).getter == getter; + } else return false; + } + } + public void callFailure(final SendableGet get, final LowLevelGetException e, final Object keyNum, int prio, final ChosenRequest req, boolean persistent) { if(!persistent) { get.onFailure(e, keyNum, null, clientContext); return; } + if(get instanceof SupportsBulkCallFailure) { + SupportsBulkCallFailure getter = (SupportsBulkCallFailure) get; + BulkCallFailureItem item = new BulkCallFailureItem(e, keyNum); + synchronized(this) { + BulkCallFailureItem[] items = (BulkCallFailureItem[]) bulkFailureLookup.get(get); + if(items == null) { + bulkFailureLookup.put(getter, new BulkCallFailureItem[] { item } ); + } else { + BulkCallFailureItem[] newItems = new BulkCallFailureItem[items.length+1]; + System.arraycopy(items, 0, newItems, 0, items.length); + newItems[items.length] = item; + bulkFailureLookup.put(getter, newItems); + } + } + jobRunner.queue(new BulkCaller(getter), prio, true); + } jobRunner.queue(new DBJob() { public void run(ObjectContainer container, ClientContext context) { Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 2008-07-21 18:28:15 UTC (rev 21291) +++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 2008-07-21 18:56:13 UTC (rev 21292) @@ -547,15 +547,38 @@ if(persistent) { container.activate(blockFetchContext, 1); } + int maxTries = blockFetchContext.maxNonSplitfileRetries; RequestScheduler sched = context.getFetchScheduler(false); + boolean set = onNonFatalFailure(e, blockNo, seg, container, context, sched, maxTries); + if(persistent && set) + container.set(this); + } + + public void onNonFatalFailure(FetchException[] failures, int[] blockNos, SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext context) { + if(persistent) { + container.activate(blockFetchContext, 1); + } + int maxTries = blockFetchContext.maxNonSplitfileRetries; + RequestScheduler sched = context.getFetchScheduler(false); + boolean set = false; + for(int i=0;i<failures.length;i++) + if(onNonFatalFailure(failures[i], blockNos[i], seg, container, context, sched, maxTries)) + set = true; + if(persistent && set) + container.set(this); + } + + /** + * Caller must set(this) iff returns true. + */ + private boolean onNonFatalFailure(FetchException e, int blockNo, SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext context, RequestScheduler sched, int maxTries) { int tries; - int maxTries = blockFetchContext.maxNonSplitfileRetries; boolean failed = false; boolean cooldown = false; ClientCHK key; SplitFileFetcherSubSegment sub = null; synchronized(this) { - if(isFinished(container)) return; + if(isFinished(container)) return false; if(blockNo < dataKeys.length) { key = dataKeys[blockNo]; if(persistent) @@ -596,13 +619,11 @@ if(tries != seg.retryCount+1) { Logger.error(this, "Failed on segment "+seg+" but tries for block (after increment) is "+tries); } - if(persistent) - container.set(this); if(failed) { onFatalFailure(e, blockNo, seg, container, context); if(logMINOR) Logger.minor(this, "Not retrying block "+blockNo+" on "+this+" : tries="+tries+"/"+maxTries); - return; + return false; } if(cooldown) { // Registered to cooldown queue @@ -618,6 +639,7 @@ if(sub != null && sub != seg) container.deactivate(sub, 1); container.deactivate(key, 5); } + return true; } private SplitFileFetcherSubSegment getSubSegment(int retryCount, ObjectContainer container, boolean noCreate) { Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java 2008-07-21 18:28:15 UTC (rev 21291) +++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java 2008-07-21 18:56:13 UTC (rev 21292) @@ -17,10 +17,12 @@ import freenet.keys.KeyBlock; import freenet.keys.KeyDecodeException; import freenet.keys.TooBigException; +import freenet.node.BulkCallFailureItem; import freenet.node.KeysFetchingLocally; import freenet.node.LowLevelGetException; import freenet.node.RequestClient; import freenet.node.SendableGet; +import freenet.node.SupportsBulkCallFailure; import freenet.support.Logger; import freenet.support.api.Bucket; @@ -33,7 +35,7 @@ * LOCKING: Synchronize on the parent segment. Nothing else makes sense w.r.t. nested locking. * Note that SendableRequest will occasionally lock on (this). That lock is always taken last. */ -public class SplitFileFetcherSubSegment extends SendableGet { +public class SplitFileFetcherSubSegment extends SendableGet implements SupportsBulkCallFailure { final int retryCount; final SplitFileFetcherSegment segment; @@ -222,49 +224,92 @@ return ctx.ignoreStore; } - // Translate it, then call the real onFailure + // SendableGet has a hashCode() and inherits equals(), which is consistent with the hashCode(). + + public void onFailure(BulkCallFailureItem[] items, ObjectContainer container, ClientContext context) { + FetchException[] fetchExceptions = new FetchException[items.length]; + int countFatal = 0; + for(int i=0;i<items.length;i++) { + fetchExceptions[i] = translateException(items[i].e); + if(fetchExceptions[i].isFatal()) countFatal++; + } + if(persistent) { + container.activate(segment, 1); + container.activate(parent, 1); + container.activate(segment.errors, 1); + } + if(parent.isCancelled()) { + if(Logger.shouldLog(Logger.MINOR, this)) + Logger.minor(this, "Failing: cancelled"); + // Fail the segment. + segment.fail(new FetchException(FetchException.CANCELLED), container, context, false); + // FIXME do we need to free the keyNum's??? Or will that happen later anyway? + return; + } + for(int i=0;i<fetchExceptions.length;i++) + segment.errors.inc(fetchExceptions[i].getMode()); + int nonFatalExceptions = items.length - countFatal; + int[] blockNumbers = new int[nonFatalExceptions]; + if(countFatal > 0) { + FetchException[] newFetchExceptions = new FetchException[items.length - countFatal]; + // Call the fatal callbacks directly. + int x = 0; + for(int i=0;i<items.length;i++) { + int blockNum = ((Integer)items[i].token).intValue(); + if(fetchExceptions[i].isFatal()) { + segment.onFatalFailure(fetchExceptions[i], blockNum, this, container, context); + } else { + blockNumbers[x] = blockNum; + newFetchExceptions[x] = fetchExceptions[i]; + x++; + } + } + fetchExceptions = newFetchExceptions; + } else { + for(int i=0;i<blockNumbers.length;i++) + blockNumbers[i] = ((Integer)items[i].token).intValue(); + } + segment.onNonFatalFailure(fetchExceptions, blockNumbers, this, container, context); + + // TODO Auto-generated method stub + + } + // FIXME refactor this out to a common method; see SimpleSingleFileFetcher - public void onFailure(LowLevelGetException e, Object token, ObjectContainer container, ClientContext context) { - if(logMINOR) - Logger.minor(this, "onFailure("+e+" , "+token+" on "+this); + private FetchException translateException(LowLevelGetException e) { switch(e.code) { case LowLevelGetException.DATA_NOT_FOUND: - onFailure(new FetchException(FetchException.DATA_NOT_FOUND), token, container, context); - return; case LowLevelGetException.DATA_NOT_FOUND_IN_STORE: - onFailure(new FetchException(FetchException.DATA_NOT_FOUND), token, container, context); - return; + return new FetchException(FetchException.DATA_NOT_FOUND); case LowLevelGetException.RECENTLY_FAILED: - onFailure(new FetchException(FetchException.RECENTLY_FAILED), token, container, context); - return; + return new FetchException(FetchException.RECENTLY_FAILED); case LowLevelGetException.DECODE_FAILED: - onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), token, container, context); - return; + return new FetchException(FetchException.BLOCK_DECODE_ERROR); case LowLevelGetException.INTERNAL_ERROR: - onFailure(new FetchException(FetchException.INTERNAL_ERROR), token, container, context); - return; + return new FetchException(FetchException.INTERNAL_ERROR); case LowLevelGetException.REJECTED_OVERLOAD: - onFailure(new FetchException(FetchException.REJECTED_OVERLOAD), token, container, context); - return; + return new FetchException(FetchException.REJECTED_OVERLOAD); case LowLevelGetException.ROUTE_NOT_FOUND: - onFailure(new FetchException(FetchException.ROUTE_NOT_FOUND), token, container, context); - return; + return new FetchException(FetchException.ROUTE_NOT_FOUND); case LowLevelGetException.TRANSFER_FAILED: - onFailure(new FetchException(FetchException.TRANSFER_FAILED), token, container, context); - return; + return new FetchException(FetchException.TRANSFER_FAILED); case LowLevelGetException.VERIFY_FAILED: - onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), token, container, context); - return; + return new FetchException(FetchException.BLOCK_DECODE_ERROR); case LowLevelGetException.CANCELLED: - onFailure(new FetchException(FetchException.CANCELLED), token, container, context); - return; + return new FetchException(FetchException.CANCELLED); default: Logger.error(this, "Unknown LowLevelGetException code: "+e.code); - onFailure(new FetchException(FetchException.INTERNAL_ERROR), token, container, context); - return; + return new FetchException(FetchException.INTERNAL_ERROR, "Unknown error code: "+e.code); } } + // Translate it, then call the real onFailure + public void onFailure(LowLevelGetException e, Object token, ObjectContainer container, ClientContext context) { + if(logMINOR) + Logger.minor(this, "onFailure("+e+" , "+token+" on "+this); + onFailure(translateException(e), token, container, context); + } + // Real onFailure protected void onFailure(FetchException e, Object token, ObjectContainer container, ClientContext context) { if(persistent) { Added: branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java =================================================================== --- branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java (rev 0) +++ branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java 2008-07-21 18:56:13 UTC (rev 21292) @@ -0,0 +1,13 @@ +package freenet.node; + +public class BulkCallFailureItem { + + public final LowLevelGetException e; + public final Object token; + + public BulkCallFailureItem(LowLevelGetException e, Object token) { + this.e = e; + this.token = token; + } + +} Added: branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java =================================================================== --- branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java (rev 0) +++ branches/db4o/freenet/src/freenet/node/SupportsBulkCallFailure.java 2008-07-21 18:56:13 UTC (rev 21292) @@ -0,0 +1,16 @@ +package freenet.node; + +import com.db4o.ObjectContainer; + +import freenet.client.async.ClientContext; + +/** + * Normally only implemented by SendableGet's. YOU MUST ALSO IMPLEMENT equals() and a hashCode() consistent with it! + * @author toad + * + */ +public interface SupportsBulkCallFailure { + + /** Process a whole batch of failures at once. */ + public abstract void onFailure(BulkCallFailureItem[] items, ObjectContainer container, ClientContext context); +}
