Author: toad Date: 2008-08-12 19:42:51 +0000 (Tue, 12 Aug 2008) New Revision: 21773
Added: branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java branches/db4o/freenet/src/freenet/node/SendableRequestSender.java Removed: branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java Modified: branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java branches/db4o/freenet/src/freenet/client/async/ClientGetter.java branches/db4o/freenet/src/freenet/client/async/ClientPutter.java branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java branches/db4o/freenet/src/freenet/keys/ClientCHK.java branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java branches/db4o/freenet/src/freenet/node/RequestScheduler.java branches/db4o/freenet/src/freenet/node/RequestStarter.java branches/db4o/freenet/src/freenet/node/SendableGet.java branches/db4o/freenet/src/freenet/node/SendableInsert.java branches/db4o/freenet/src/freenet/node/SendableRequest.java branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java Log: Keep the chosen requests list entirely in RAM. Choose one or more SendableRequest's, and fetch all of the blocks on those requests. Drop stuff from the queue if more important requests are registered. This is a major optimisation: Expect approx 40% reduction in worst case time spent on the database thread. Tested for requests but not yet inserts. Modified: branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -3,6 +3,9 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.client.async; +import java.util.Collections; +import java.util.List; + import com.db4o.ObjectContainer; import freenet.client.FetchContext; @@ -91,7 +94,7 @@ if((retryCount <= maxRetries) || (maxRetries == -1)) { if(persistent) container.set(this); - if(retryCount % ClientRequestScheduler.COOLDOWN_RETRIES == 0) { + if(retryCount % RequestScheduler.COOLDOWN_RETRIES == 0) { // Add to cooldown queue. Don't reschedule yet. long now = System.currentTimeMillis(); if(cooldownWakeupTime > now) @@ -254,4 +257,13 @@ } } + @Override + public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest request, RequestScheduler sched, ObjectContainer container, ClientContext context) { + if(persistent) + container.activate(key, 5); + ClientKey ckey = key.cloneKey(); + PersistentChosenBlock block = new PersistentChosenBlock(false, request, keys[0], ckey.getNodeKey(), ckey, sched); + return Collections.singletonList(block); + } + } Added: branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java (rev 0) +++ branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -0,0 +1,73 @@ +package freenet.client.async; + +import freenet.keys.ClientKey; +import freenet.keys.Key; +import freenet.node.LowLevelGetException; +import freenet.node.LowLevelPutException; +import freenet.node.NodeClientCore; +import freenet.node.RequestScheduler; +import freenet.node.SendableRequestSender; + +/** + * A single selected request, including everything needed to execute it. + * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450) + */ +public abstract class ChosenBlock { + + /** The token indicating the key within the request to be fetched/inserted. + * Meaning is entirely defined by the request. */ + public transient final Object token; + /** The key to be fetched, null if not a BaseSendableGet */ + public transient final Key key; + /** The client-layer key to be fetched, null if not a SendableGet */ + public transient final ClientKey ckey; + public transient final boolean localRequestOnly; + public transient final boolean cacheLocalRequests; + public transient final boolean ignoreStore; + + public ChosenBlock(Object token, Key key, ClientKey ckey, boolean localRequestOnly, boolean cacheLocalRequests, boolean ignoreStore, RequestScheduler sched) { + this.token = token; + this.key = key; + this.ckey = ckey; + this.localRequestOnly = localRequestOnly; + this.cacheLocalRequests = cacheLocalRequests; + this.ignoreStore = ignoreStore; + // Ignore return value. Not our problem at this point. + // It won't have been scheduled if there were *no* valid blocks... + // it's possible, but very unlikely, that some blocks are scheduled + // and some are not. + if(key != null) + sched.addToFetching(key); + } + + public abstract boolean isPersistent(); + + public abstract boolean isCancelled(); + + public abstract void onFailure(LowLevelPutException e, ClientContext context); + + public abstract void onInsertSuccess(ClientContext context); + + public abstract void onFailure(LowLevelGetException e, ClientContext context); + + /** + * The actual data delivery goes through CRS.tripPendingKey(). This is just a notification + * for book-keeping purposes. We call the scheduler to tell it that the request succeeded, + * so that it can be rescheduled soon for more requests. + * @param context Might be useful. + */ + public abstract void onFetchSuccess(ClientContext context); + + public abstract short getPriority(); + + public boolean send(NodeClientCore core, RequestScheduler sched) { + ClientContext context = sched.getContext(); + return getSender(context).send(core, sched, context, this); + } + + public abstract SendableRequestSender getSender(ClientContext context); + + public void removeFromFetching(ClientRequestSchedulerCore core) { + core.removeFetchingKey(key); + } +} Deleted: branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -1,88 +0,0 @@ -/* This code is part of Freenet. It is distributed under the GNU General - * Public License, version 2 (or at your option any later version). See - * http://www.gnu.org/ for further details of the GPL. */ -package freenet.client.async; - -import com.db4o.ObjectContainer; - -import freenet.client.FetchContext; -import freenet.keys.ClientKey; -import freenet.keys.Key; -import freenet.node.NodeClientCore; -import freenet.node.RequestScheduler; -import freenet.node.SendableGet; -import freenet.node.SendableRequest; - -/** - * A request chosen by ClientRequestScheduler. - * @author toad - */ -public class ChosenRequest { - - /** The request object */ - public final SendableRequest request; - /** The token indicating the key within the request to be fetched/inserted. - * Meaning is entirely defined by the request. */ - public final Object token; - /** The key to be fetched, null if not a BaseSendableGet */ - public final Key key; - /** The client-layer key to be fetched, null if not a SendableGet */ - public final ClientKey ckey; - /** Priority when we selected it */ - public short prio; - public final boolean localRequestOnly; - public final boolean cacheLocalRequests; - public final boolean ignoreStore; - - ChosenRequest(SendableRequest req, Object tok, Key key, ClientKey ckey, short prio, ObjectContainer container) { - request = req; - token = tok; - this.key = key; - this.ckey = ckey; - this.prio = prio; - if(req instanceof SendableGet) { - SendableGet sg = (SendableGet) req; - FetchContext ctx = sg.getContext(); - if(container != null) - container.activate(ctx, 1); - localRequestOnly = ctx.localRequestOnly; - cacheLocalRequests = ctx.cacheLocalRequests; - ignoreStore = ctx.ignoreStore; - } else { - localRequestOnly = false; - cacheLocalRequests = false; - ignoreStore = false; - } - } - - public boolean send(NodeClientCore core, RequestScheduler sched) { - return request.send(core, sched, this); - } - - public boolean isPersistent() { - return this instanceof PersistentChosenRequest; - } - - public boolean equals(Object o) { - if(!(o instanceof ChosenRequest)) return false; - if(o == this) return true; - ChosenRequest cr = (ChosenRequest) o; - if(!cr.request.equals(request)) return false; - if(!cr.token.equals(token)) return false; - if(cr.key != null) { - if(key != null) { - if(!key.equals(cr.key)) return false; - } else return false; - } else { - if(key != null) return false; - } - if(cr.ckey != null) { - if(ckey != null) { - if(!ckey.equals(cr.ckey)) return false; - } else return false; - } else { - if(ckey != null) return false; - } - return true; - } -} Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -21,6 +21,7 @@ import freenet.keys.FreenetURI; import freenet.keys.Key; import freenet.node.RequestClient; +import freenet.node.RequestScheduler; import freenet.support.Logger; import freenet.support.api.Bucket; import freenet.support.io.BucketTools; @@ -60,7 +61,7 @@ * write the data directly to the bucket, or copy it and free the original temporary bucket. Preferably the * former, obviously! */ - public ClientGetter(ClientCallback client, ClientRequestScheduler chkSched, ClientRequestScheduler sskSched, + public ClientGetter(ClientCallback client, RequestScheduler chkSched, RequestScheduler sskSched, FreenetURI uri, FetchContext ctx, short priorityClass, RequestClient clientContext, Bucket returnBucket, Bucket binaryBlobBucket) { super(priorityClass, clientContext); this.clientCallback = client; @@ -145,8 +146,10 @@ try { if(Logger.shouldLog(Logger.MINOR, this)) Logger.minor(this, "Copying - returnBucket not respected by client.async"); - if(persistent()) + if(persistent()) { container.activate(from, 5); + container.activate(returnBucket, 5); + } BucketTools.copy(from, to); from.free(); if(persistent()) Modified: branches/db4o/freenet/src/freenet/client/async/ClientPutter.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientPutter.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/ClientPutter.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -16,6 +16,7 @@ import freenet.keys.BaseClientKey; import freenet.keys.FreenetURI; import freenet.node.RequestClient; +import freenet.node.RequestScheduler; import freenet.support.Logger; import freenet.support.SimpleFieldSet; import freenet.support.api.Bucket; @@ -55,7 +56,7 @@ * @param targetFilename If set, create a one-file manifest containing this filename pointing to this file. */ public ClientPutter(ClientCallback client, Bucket data, FreenetURI targetURI, ClientMetadata cm, InsertContext ctx, - ClientRequestScheduler chkScheduler, ClientRequestScheduler sskScheduler, short priorityClass, boolean getCHKOnly, + RequestScheduler chkScheduler, RequestScheduler sskScheduler, short priorityClass, boolean getCHKOnly, boolean isMetadata, RequestClient clientContext, SimpleFieldSet stored, String targetFilename, boolean binaryBlob) { super(priorityClass, clientContext); this.cm = cm; Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -3,6 +3,7 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.client.async; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; @@ -17,7 +18,6 @@ import freenet.keys.Key; import freenet.keys.KeyBlock; import freenet.node.BaseSendableGet; -import freenet.node.BulkCallFailureItem; import freenet.node.KeysFetchingLocally; import freenet.node.LowLevelGetException; import freenet.node.LowLevelPutException; @@ -28,7 +28,6 @@ import freenet.node.SendableGet; import freenet.node.SendableInsert; import freenet.node.SendableRequest; -import freenet.node.SupportsBulkCallFailure; import freenet.support.Logger; import freenet.support.PrioritizedSerialExecutor; import freenet.support.api.StringCallback; @@ -116,8 +115,6 @@ this.selectorContainer = node.db; schedCore = ClientRequestSchedulerCore.create(node, forInserts, forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this, context); schedTransient = new ClientRequestSchedulerNonPersistent(this, forInserts, forSSKs); - schedCore.fillStarterQueue(selectorContainer); - schedCore.start(core); persistentCooldownQueue = schedCore.persistentCooldownQueue; this.databaseExecutor = core.clientDatabaseExecutor; this.datastoreCheckerExecutor = core.datastoreCheckerExecutor; @@ -148,6 +145,11 @@ logMINOR = Logger.shouldLog(Logger.MINOR, this); } + public void start(NodeClientCore core) { + schedCore.start(core); + queueFillRequestStarterQueue(); + } + /** Called by the config. Callback * * @param val @@ -412,11 +414,6 @@ finishRegister(getters, persistent, false, anyValid, regme); } - /** If enabled, if the queue is less than 25% full, attempt to add newly - * registered requests directly to it, storing a PersistentChosenRequest and - * bypassing registration on the queue. Risky optimisation. */ - static final boolean TRY_DIRECT = true; - private void finishRegister(final SendableGet[] getters, boolean persistent, boolean onDatabaseThread, final boolean anyValid, final RegisterMe reg) { if(isInsertScheduler && getters != null) { IllegalStateException e = new IllegalStateException("finishRegister on an insert scheduler"); @@ -424,7 +421,7 @@ for(int i=0;i<getters.length;i++) { if(persistent) selectorContainer.activate(getters[i], 1); - getters[i].internalError(null, e, this, selectorContainer, clientContext, persistent); + getters[i].internalError(e, this, selectorContainer, clientContext, persistent); if(persistent) selectorContainer.deactivate(getters[i], 1); } @@ -439,57 +436,21 @@ } if(persistent) selectorContainer.activate(getters, 1); - boolean tryDirect = false; - if(anyValid && TRY_DIRECT) { - synchronized(starterQueue) { - tryDirect = starterQueue.size() < MAX_STARTER_QUEUE_SIZE * 1 / 4; - } - if(tryDirect) { - for(int i=0;i<getters.length;i++) { - SendableGet getter = getters[i]; - if(persistent) - selectorContainer.activate(getter, 1); - while(true) { - PersistentChosenRequest cr = (PersistentChosenRequest) schedCore.maybeMakeChosenRequest(getter, selectorContainer, clientContext); - if(cr == null) { - if(!(getter.isEmpty(selectorContainer) || getter.isCancelled(selectorContainer))) - // Still needs to be registered - tryDirect = false; - break; - } - synchronized(starterQueue) { - if(logMINOR) Logger.minor(this, "Adding directly to queue: "+cr); - starterQueue.add(cr); - if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) { - if(!(getter.isEmpty(selectorContainer) || getter.isCancelled(selectorContainer))) - // Still stuff to register. - tryDirect = false; - break; - } - } - } - if(persistent) - selectorContainer.deactivate(getter, 1); - } - } - } if(logMINOR) Logger.minor(this, "finishRegister() for "+getters); if(anyValid) { - if(!tryDirect) { - boolean wereAnyValid = false; - for(int i=0;i<getters.length;i++) { - SendableGet getter = getters[i]; - selectorContainer.activate(getters[i], 1); - if(!(getter.isCancelled(selectorContainer) || getter.isEmpty(selectorContainer))) { - wereAnyValid = true; - schedCore.innerRegister(getter, random, selectorContainer); - } + boolean wereAnyValid = false; + for(int i=0;i<getters.length;i++) { + SendableGet getter = getters[i]; + selectorContainer.activate(getters[i], 1); + if(!(getter.isCancelled(selectorContainer) || getter.isEmpty(selectorContainer))) { + wereAnyValid = true; + schedCore.innerRegister(getter, random, selectorContainer); } - if(!wereAnyValid) { - Logger.normal(this, "No requests valid: "+getters); - } } + if(!wereAnyValid) { + Logger.normal(this, "No requests valid: "+getters); + } } if(reg != null) selectorContainer.delete(reg); @@ -549,32 +510,51 @@ schedTransient.addPendingKey(key.getNodeKey(), getter, null); } - private synchronized ChosenRequest removeFirst(ObjectContainer container, boolean transientOnly, boolean notTransient) { - if(!databaseExecutor.onThread()) { - throw new IllegalStateException("Not on database thread!"); - } + public ChosenBlock getBetterNonPersistentRequest(short prio, int retryCount) { short fuzz = -1; if(PRIORITY_SOFT.equals(choosenPriorityScheduler)) fuzz = -1; else if(PRIORITY_HARD.equals(choosenPriorityScheduler)) fuzz = 0; - // schedCore juggles both - return schedCore.removeFirst(fuzz, random, offeredKeys, starter, schedTransient, transientOnly, notTransient, Short.MAX_VALUE, Integer.MAX_VALUE, clientContext, container); - } - - public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req) { - short fuzz = -1; - if(PRIORITY_SOFT.equals(choosenPriorityScheduler)) - fuzz = -1; - else if(PRIORITY_HARD.equals(choosenPriorityScheduler)) - fuzz = 0; - if(req == null) - return schedCore.removeFirst(fuzz, random, offeredKeys, starter, schedTransient, true, false, Short.MAX_VALUE, Integer.MAX_VALUE, clientContext, null); - short prio = req.prio; - int retryCount = req.request.getRetryCount(); return schedCore.removeFirst(fuzz, random, offeredKeys, starter, schedTransient, true, false, prio, retryCount, clientContext, null); } + /** + * All the persistent SendableRequest's currently running (either actually in flight, just chosen, + * awaiting the callbacks being executed etc). Note that this is an ArrayList because we *must* + * compare by pointer: these objects may well implement hashCode() etc for use by other code, but + * if they are deactivated, they will be unreliable. Fortunately, this will be fairly small most + * of the time, since a single SendableRequest might include 256 actual requests. + * + * SYNCHRONIZATION: Synched on starterQueue. + */ + private final transient ArrayList<SendableRequest> runningPersistentRequests = new ArrayList<SendableRequest> (); + + public void removeRunningRequest(SendableRequest request) { + synchronized(starterQueue) { + for(int i=0;i<runningPersistentRequests.size();i++) { + if(runningPersistentRequests.get(i) == request) { + runningPersistentRequests.remove(i); + i--; + } + } + } + } + + public boolean isRunningRequest(SendableRequest request) { + synchronized(starterQueue) { + for(int i=0;i<runningPersistentRequests.size();i++) { + if(runningPersistentRequests.get(i) == request) + return true; + } + } + return false; + } + + void startingRequest(SendableRequest request) { + runningPersistentRequests.add(request); + } + /** The maximum number of requests that we will keep on the in-RAM request * starter queue. */ static final int MAX_STARTER_QUEUE_SIZE = 100; @@ -586,31 +566,73 @@ * something odd is happening.. (e.g. leaking PersistentChosenRequest's). */ static final int WARNING_STARTER_QUEUE_SIZE = 300; + private transient LinkedList<PersistentChosenRequest> starterQueue = new LinkedList<PersistentChosenRequest>(); + + /** Length of the starter queue in requests. */ + private transient int starterQueueLength; + /** - * Normally this will only contain PersistentChosenRequest's, however in the - * case of coalescing keys, we will put ChosenRequest's back onto it as well. + * Called by RequestStarter to find a request to run. */ - private transient LinkedList starterQueue = new LinkedList(); - - public LinkedList getRequestStarterQueue() { - return starterQueue; + public ChosenBlock grabRequest() { + while(true) { + PersistentChosenRequest reqGroup; + synchronized(starterQueue) { + reqGroup = starterQueue.isEmpty() ? null : starterQueue.getFirst(); + } + if(reqGroup != null) { + // Try to find a better non-persistent request + ChosenBlock better = getBetterNonPersistentRequest(reqGroup.prio, reqGroup.retryCount); + if(better != null) return better; + } + if(reqGroup == null) { + queueFillRequestStarterQueue(); + return getBetterNonPersistentRequest(Short.MAX_VALUE, Integer.MAX_VALUE); + } + ChosenBlock block; + int length = starterQueueLength; + synchronized(starterQueue) { + block = reqGroup.grabNotStarted(clientContext.fastWeakRandom); + if(block == null) { + for(int i=0;i<starterQueue.size();i++) { + if(starterQueue.get(i) == reqGroup) { + starterQueue.remove(i); + i--; + } + } + continue; + } + starterQueueLength--; + } + if(length < MAX_STARTER_QUEUE_SIZE) + queueFillRequestStarterQueue(); + return block; + } } public void queueFillRequestStarterQueue() { synchronized(starterQueue) { - if(starterQueue.size() > MAX_STARTER_QUEUE_SIZE / 2) + if(starterQueueLength > MAX_STARTER_QUEUE_SIZE / 2) return; } jobRunner.queue(requestStarterQueueFiller, NativeThread.MAX_PRIORITY, true); } - void addToStarterQueue(ChosenRequest req) { + /** + * @param request + * @param container + * @return True if the queue is now full/over-full. + */ + boolean addToStarterQueue(SendableRequest request, ObjectContainer container) { + container.activate(request, 1); + PersistentChosenRequest chosen = new PersistentChosenRequest(request, request.getPriorityClass(container), request.getRetryCount(), container, ClientRequestScheduler.this, clientContext); + container.deactivate(request, 1); synchronized(starterQueue) { - if(starterQueue.contains(req)) { - Logger.error(this, "Not re-adding: "+req); - return; - } - starterQueue.add(req); + // Since we pass in runningPersistentRequests, we don't need to check whether it is already in the starterQueue. + starterQueue.add(chosen); + starterQueueLength += chosen.sizeNotStarted(); + runningPersistentRequests.add(request); + return starterQueueLength < MAX_STARTER_QUEUE_SIZE; } } @@ -626,65 +648,116 @@ private DBJob requestStarterQueueFiller = new DBJob() { public void run(ObjectContainer container, ClientContext context) { if(logMINOR) Logger.minor(this, "Filling request queue... (SSK="+isSSKScheduler+" insert="+isInsertScheduler); - ChosenRequest req = null; + short fuzz = -1; + if(PRIORITY_SOFT.equals(choosenPriorityScheduler)) + fuzz = -1; + else if(PRIORITY_HARD.equals(choosenPriorityScheduler)) + fuzz = 0; synchronized(starterQueue) { - int size = starterQueue.size(); - if(logMINOR) Logger.minor(this, "Queue size: "+size+" SSK="+isSSKScheduler+" insert="+isInsertScheduler); - if(size > MAX_STARTER_QUEUE_SIZE * 3 / 4) { + // Recompute starterQueueLength + int length = 0; + for(PersistentChosenRequest req : starterQueue) + length += req.sizeNotStarted(); + if(length != starterQueueLength) { + Logger.error(this, "Correcting starterQueueLength from "+starterQueueLength+" to "+length); + starterQueueLength = length; + } + if(logMINOR) Logger.minor(this, "Queue size: "+length+" SSK="+isSSKScheduler+" insert="+isInsertScheduler); + if(starterQueueLength > MAX_STARTER_QUEUE_SIZE * 3 / 4) { return; } - if(size >= MAX_STARTER_QUEUE_SIZE) { - if(size >= WARNING_STARTER_QUEUE_SIZE) + if(starterQueueLength >= MAX_STARTER_QUEUE_SIZE) { + if(starterQueueLength >= WARNING_STARTER_QUEUE_SIZE) Logger.error(this, "Queue already full: "+starterQueue.size()); return; } } - SendableRequest lastReq = null; - int sameKey = 0; + while(true) { - req = null; - if(lastReq != null && sameKey < MAX_CONSECUTIVE_SAME_REQ && - lastReq.getParentGrabArray() != null) { - req = schedCore.maybeMakeChosenRequest(lastReq, container, context); - sameKey++; - } - if(req == null) { - req = removeFirst(container, false, true); - if(sameKey > 1) - Logger.normal(this, "Selected "+sameKey+" requests from same SendableRequest: "+lastReq); - sameKey = 0; - } - if(req == null) { - if(lastReq != null) { - container.deactivate(lastReq, 1); + SendableRequest request = schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient, false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container); + if(request == null) return; + boolean full = addToStarterQueue(request, container); + starter.wakeUp(); + if(full) return; + return; + } + } + }; + + /** + * Compare a recently registered SendableRequest to what is already on the + * starter queue. If it is better, kick out stuff from the queue until we + * are just over the limit. + * @param req + * @param container + */ + public void maybeAddToStarterQueue(SendableRequest req, ObjectContainer container) { + short prio = req.getPriorityClass(container); + int retryCount = req.getRetryCount(); + synchronized(starterQueue) { + boolean allBetter = true; + for(PersistentChosenRequest old : starterQueue) { + if(old.prio < prio) + allBetter = false; + else if(old.prio == prio && old.retryCount <= retryCount) + allBetter = false; + } + if(allBetter && !starterQueue.isEmpty()) return; + } + addToStarterQueue(req, container); + trimStarterQueue(container); + } + + private void trimStarterQueue(ObjectContainer container) { + ArrayList<PersistentChosenRequest> dumped = null; + synchronized(starterQueue) { + while(starterQueueLength > MAX_STARTER_QUEUE_SIZE) { + // Find the lowest priority/retry count request. + // If we can dump it without going below the limit, then do so. + // If we can't, return. + PersistentChosenRequest worst = null; + short worstPrio = -1; + int worstRetryCount = -1; + int worstIndex = -1; + if(starterQueue.isEmpty()) { + if(starterQueueLength != 0) { + Logger.error(this, "Starter queue empty but starterQueueLength is "+starterQueueLength); + starterQueueLength = 0; } - return; + break; } - lastReq = req.request; - if(logMINOR) Logger.minor(this, "Activating key"); - container.activate(req.key, 5); - container.activate(req.ckey, 5); - container.activate(req.request, 1); - container.activate(req.request.getClientRequest(), 1); - if(logMINOR) Logger.minor(this, "Activated"); - synchronized(starterQueue) { - if(req != null) { - starterQueue.add(req); - if(logMINOR) - Logger.minor(this, "Added to starterQueue: "+req+" size now "+starterQueue.size()); - req = null; + for(int i=0;i<starterQueue.size();i++) { + PersistentChosenRequest req = starterQueue.get(i); + short prio = req.prio; + int retryCount = req.retryCount; + if(prio > worstPrio || + (prio == worstPrio && retryCount > worstRetryCount)) { + worstPrio = prio; + worstRetryCount = retryCount; + worst = req; + worstIndex = i; + continue; } - if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) { - if(lastReq != null) { - container.deactivate(lastReq, 1); - } - return; - } } + int lengthAfter = starterQueueLength - worst.sizeNotStarted(); + if(lengthAfter >= MAX_STARTER_QUEUE_SIZE) { + if(dumped == null) + dumped = new ArrayList<PersistentChosenRequest>(2); + dumped.add(worst); + starterQueue.remove(worstIndex); + if(lengthAfter == MAX_STARTER_QUEUE_SIZE) break; + } else { + // Can't remove any more. + break; + } } } - }; - + if(dumped == null) return; + for(PersistentChosenRequest req : dumped) { + req.onDumped(schedCore, container); + } + } + public void removePendingKey(final GotKeyListener getter, final boolean complain, final Key key, ObjectContainer container) { if(!getter.persistent()) { boolean dropped = schedTransient.removePendingKey(getter, complain, key, container); @@ -764,14 +837,13 @@ */ static final short TRIP_PENDING_PRIORITY = NativeThread.HIGH_PRIORITY-1; - public synchronized void succeeded(final BaseSendableGet succeeded, final ChosenRequest req) { + public synchronized void succeeded(final BaseSendableGet succeeded, final ChosenBlock req) { if(req.isPersistent()) { jobRunner.queue(new DBJob() { public void run(ObjectContainer container, ClientContext context) { container.activate(succeeded, 1); schedCore.succeeded(succeeded, container); - container.delete((PersistentChosenRequest)req); container.deactivate(succeeded, 1); } @@ -993,152 +1065,40 @@ schedCore.removeFetchingKey(key); } - public void removeChosenRequest(ChosenRequest req) { - schedCore.removeChosenRequest(req); - } - /** * Map from SendableGet implementing SupportsBulkCallFailure to BulkCallFailureItem[]. */ private transient HashMap bulkFailureLookupItems = new HashMap(); private transient HashMap bulkFailureLookupJob = new HashMap(); - - private class BulkCaller implements DBJob { - - private final SupportsBulkCallFailure getter; - - BulkCaller(SupportsBulkCallFailure getter) { - this.getter = getter; - if(getter == null) throw new NullPointerException(); - } - public void run(ObjectContainer container, ClientContext context) { - BulkCallFailureItem[] items; - container.activate(getter, 1); - synchronized(ClientRequestScheduler.this) { - items = (BulkCallFailureItem[]) bulkFailureLookupItems.get(getter); - bulkFailureLookupItems.remove(getter); - bulkFailureLookupJob.remove(getter); - } - if(items != null && items.length > 0) { - if(logMINOR) Logger.minor(this, "Calling non-fatal failure in bulk for "+items.length+" items for "+getter); - getter.onFailure(items, container, context); - for(int i=0;i<items.length;i++) - if(items[i] != null) - container.delete(items[i].req); - } else - Logger.normal(this, "Calling non-fatal failure in bulk for "+getter+" but no items to run for "+getter); - container.deactivate(getter, 1); - } - - public boolean equals(Object o) { - if(o instanceof BulkCaller) { - return ((BulkCaller)o).getter == getter; - } else return false; - } - } - - public void callFailure(final SendableGet get, final LowLevelGetException e, final Object keyNum, final int prio, final ChosenRequest req, boolean persistent) { + public void callFailure(final SendableGet get, final LowLevelGetException e, int prio, boolean persistent) { if(!persistent) { - get.onFailure(e, keyNum, null, clientContext); - return; - } - if(get instanceof SupportsBulkCallFailure) { - // Getter MUST BE ACTIVATED for us to use it as a key. - // The below job doesn't write anything, so it will run fast. + get.onFailure(e, null, null, clientContext); + } else { jobRunner.queue(new DBJob() { public void run(ObjectContainer container, ClientContext context) { - container.activate(get, 1); - if(logMINOR) - Logger.minor(this, "Calling bulk failure for "+get); - SupportsBulkCallFailure getter = (SupportsBulkCallFailure) get; - BulkCallFailureItem item = new BulkCallFailureItem(e, keyNum, (PersistentChosenRequest) req); - BulkCaller caller = null; - synchronized(this) { - BulkCallFailureItem[] items = (BulkCallFailureItem[]) bulkFailureLookupItems.get(get); - if(items == null) { - bulkFailureLookupItems.put(getter, new BulkCallFailureItem[] { item } ); - } else { - BulkCallFailureItem[] newItems = new BulkCallFailureItem[items.length+1]; - System.arraycopy(items, 0, newItems, 0, items.length); - newItems[items.length] = item; - bulkFailureLookupItems.put(getter, newItems); - } - caller = (BulkCaller) bulkFailureLookupJob.get(getter); - if(caller == null) { - caller = new BulkCaller(getter); - bulkFailureLookupJob.put(getter, caller); - } else - caller = null; - - } - if(caller != null) - jobRunner.queue(caller, prio, true); - else { - if(logMINOR) - Logger.minor(this, "Not calling bulk failure for "+get); - } - container.deactivate(get, 1); + get.onFailure(e, null, container, clientContext); } - }, NativeThread.HIGH_PRIORITY, false); - return; + }, prio, false); } - jobRunner.queue(new DBJob() { - - public void run(ObjectContainer container, ClientContext context) { - container.activate(get, 1); - if(logMINOR) - Logger.minor(this, "callFailure() on "+get+" : "+e); - get.onFailure(e, keyNum, selectorContainer, clientContext); - if(logMINOR) - Logger.minor(this, "Deleting "+req); - selectorContainer.delete((PersistentChosenRequest)req); - container.deactivate(get, 1); - } - - }, prio, false); } - public void callFailure(final SendableInsert put, final LowLevelPutException e, final Object keyNum, int prio, final ChosenRequest req, boolean persistent) { + public void callFailure(final SendableInsert insert, final LowLevelPutException e, int prio, boolean persistent) { if(!persistent) { - put.onFailure(e, keyNum, null, clientContext); - return; - } - jobRunner.queue(new DBJob() { + insert.onFailure(e, null, null, clientContext); + } else { + jobRunner.queue(new DBJob() { - public void run(ObjectContainer container, ClientContext context) { - container.activate(put, 1); - put.onFailure(e, keyNum, selectorContainer, clientContext); - if(logMINOR) - Logger.minor(this, "Deleting "+req); - selectorContainer.delete((PersistentChosenRequest)req); - container.deactivate(put, 1); - } - - }, NativeThread.NORM_PRIORITY, false); - } - - public void callSuccess(final SendableInsert put, final Object keyNum, int prio, final ChosenRequest req, boolean persistent) { - if(!persistent) { - put.onSuccess(keyNum, null, clientContext); - return; + public void run(ObjectContainer container, ClientContext context) { + insert.onFailure(e, null, container, context); + } + + }, prio, false); } - jobRunner.queue(new DBJob() { - - public void run(ObjectContainer container, ClientContext context) { - container.activate(put, 1); - put.onSuccess(keyNum, selectorContainer, clientContext); - if(logMINOR) - Logger.minor(this, "Deleting "+req); - selectorContainer.delete((PersistentChosenRequest)req); - container.deactivate(put, 1); - } - - }, NativeThread.NORM_PRIORITY+1, false); } - + public FECQueue getFECQueue() { return clientContext.fecQueue; } @@ -1154,29 +1114,6 @@ return schedCore.addToFetching(key); } - public void requeue(final ChosenRequest req) { - if(req.isPersistent()) { - this.clientContext.jobRunner.queue(new DBJob() { - - public void run(ObjectContainer container, ClientContext context) { - container.activate(req.request, 1); - if(logMINOR) - Logger.minor(this, "Requeueing "+req); - if(req.request.isCancelled(container)) { - container.delete(req); - return; - } - container.deactivate(req.request, 1); - addToStarterQueue(req); - } - - }, NativeThread.HIGH_PRIORITY, false); - } else { - if(req.request.isCancelled(null)) return; - addToStarterQueue(req); - } - } - public long countPersistentQueuedRequests(ObjectContainer container) { return schedCore.countQueuedRequests(container); } @@ -1197,6 +1134,6 @@ else schedTransient.removeFromAllRequestsByClientRequest(get, clientRequest, dontComplain, null); } + - } Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -12,6 +12,7 @@ import freenet.crypt.RandomSource; import freenet.keys.Key; import freenet.node.BaseSendableGet; +import freenet.node.RequestScheduler; import freenet.node.RequestStarter; import freenet.node.SendableInsert; import freenet.node.SendableRequest; @@ -51,6 +52,7 @@ protected final SortedVectorByNumber[] priorities; protected final Map allRequestsByClientRequest; protected final List /* <BaseSendableGet> */ recentSuccesses; + protected transient ClientRequestScheduler sched; abstract boolean persistent(); @@ -90,6 +92,8 @@ container.deactivate(v, 1); addToGrabArray(prio, retryCount, fixRetryCount(retryCount), req.getClient(), req.getClientRequest(), req, random, container); if(logMINOR) Logger.minor(this, "Registered "+req+" on prioclass="+prio+", retrycount="+retryCount+" v.size()="+vSize); + if(req.persistent()) + sched.maybeAddToStarterQueue(req, container); } synchronized void addToGrabArray(short priorityClass, int retryCount, int rc, Object client, ClientRequester cr, SendableRequest req, RandomSource random, ObjectContainer container) { @@ -139,7 +143,7 @@ return Math.max(0, retryCount-MIN_RETRY_COUNT); } - public void reregisterAll(ClientRequester request, RandomSource random, ClientRequestScheduler lock, ObjectContainer container, ClientContext context) { + public void reregisterAll(ClientRequester request, RandomSource random, RequestScheduler lock, ObjectContainer container, ClientContext context) { SendableRequest[] reqs; synchronized(lock) { Set h = (Set) allRequestsByClientRequest.get(request); Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -16,6 +16,7 @@ import com.db4o.types.Db4oList; import com.db4o.types.Db4oMap; +import freenet.client.FetchContext; import freenet.crypt.RandomSource; import freenet.keys.ClientKey; import freenet.keys.Key; @@ -47,7 +48,6 @@ /** Identifier in the database for the node we are attached to */ private final long nodeDBHandle; final PersistentCooldownQueue persistentCooldownQueue; - private transient ClientRequestScheduler sched; private transient long initTime; /** @@ -106,6 +106,7 @@ } else { this.persistentCooldownQueue = null; } + } private void onStarted(ObjectContainer container, long cooldownTime, ClientRequestScheduler sched, ClientContext context) { @@ -119,13 +120,13 @@ if(!isInsertScheduler) { persistentCooldownQueue.setCooldownTime(cooldownTime); } + this.sched = sched; + this.initTime = System.currentTimeMillis(); + // We DO NOT want to rerun the query after consuming the initial set... if(!isInsertScheduler) keysFetching = new HashSet(); else keysFetching = null; - this.sched = sched; - this.initTime = System.currentTimeMillis(); - // We DO NOT want to rerun the query after consuming the initial set... preRegisterMeRunner = new DBJob() { public void run(ObjectContainer container, ClientContext context) { @@ -211,43 +212,6 @@ runner.queue(preRegisterMeRunner, NativeThread.NORM_PRIORITY, true); } - void fillStarterQueue(ObjectContainer container) { - ObjectSet results = container.query(new Predicate() { - public boolean match(PersistentChosenRequest req) { - if(req.core != ClientRequestSchedulerCore.this) return false; - return true; - } - }); - int count = 0; - while(results.hasNext()) { - count++; - PersistentChosenRequest req = (PersistentChosenRequest) results.next(); - container.activate(req, 2); - container.activate(req.key, 5); - container.activate(req.ckey, 5); - if(req.request == null) { - container.delete(req); - Logger.error(this, "Deleting bogus PersistentChosenRequest"); - continue; - } - container.activate(req.request, 1); - container.activate(req.request.getClientRequest(), 1); - if(req.token != null) - container.activate(req.token, 5); - if(req.request.isCancelled(container)) { - container.delete(req); - continue; - } - if(logMINOR) - Logger.minor(this, "Adding old request: "+req); - sched.addToStarterQueue(req); - } -// if(count > ClientRequestScheduler.MAX_STARTER_QUEUE_SIZE) - Logger.error(this, "Added "+count+" requests to the starter queue, size now "+sched.starterQueueSize()); -// else -// Logger.normal(this, "Added "+count+" requests to the starter queue, size now "+sched.starterQueueSize()); - } - // We pass in the schedTransient to the next two methods so that we can select between either of them. private int removeFirstAccordingToPriorities(boolean tryOfferedKeys, int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, short maxPrio, ObjectContainer container){ @@ -288,11 +252,11 @@ // We prevent a number of race conditions (e.g. adding a retry count and then another // thread removes it cos its empty) ... and in addToGrabArray etc we already sync on this. // The worry is ... is there any nested locking outside of the hierarchy? - ChosenRequest removeFirst(int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, RequestStarter starter, ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, boolean notTransient, short maxPrio, int retryCount, ClientContext context, ObjectContainer container) { + ChosenBlock removeFirst(int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, RequestStarter starter, ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, boolean notTransient, short maxPrio, int retryCount, ClientContext context, ObjectContainer container) { SendableRequest req = removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient, transientOnly, notTransient, maxPrio, retryCount, context, container); if(isInsertScheduler && req instanceof SendableGet) { IllegalStateException e = new IllegalStateException("removeFirstInner returned a SendableGet on an insert scheduler!!"); - req.internalError(null, e, sched, container, context, req.persistent()); + req.internalError(e, sched, container, context, req.persistent()); throw e; } return maybeMakeChosenRequest(req, container, context); @@ -300,7 +264,7 @@ private int ctr; - public ChosenRequest maybeMakeChosenRequest(SendableRequest req, ObjectContainer container, ClientContext context) { + public ChosenBlock maybeMakeChosenRequest(SendableRequest req, ObjectContainer container, ClientContext context) { if(req == null) return null; if(req.isEmpty(container) || req.isCancelled(container)) return null; Object token = req.chooseKey(this, req.persistent() ? container : null, context); @@ -319,26 +283,27 @@ else ckey = null; } - ChosenRequest ret; - if(req.persistent()) { - container.activate(key, 5); - container.activate(ckey, 5); - container.activate(req.getClientRequest(), 1); - if(key != null && key.getRoutingKey() == null) - throw new NullPointerException(); - ret = new PersistentChosenRequest(this, req, token, key == null ? null : key.cloneKey(), - ckey == null ? null : ckey.cloneKey(), req.getPriorityClass(container), container); - container.set(ret); - if(logMINOR) - Logger.minor(this, "Storing "+ret+" for "+req); - container.deactivate(key, 5); - container.deactivate(ckey, 5); - container.deactivate(req.getClientRequest(), 1); + ChosenBlock ret; + assert(!req.persistent()); + if(key != null && key.getRoutingKey() == null) + throw new NullPointerException(); + boolean localRequestOnly; + boolean cacheLocalRequests; + boolean ignoreStore; + if(req instanceof SendableGet) { + SendableGet sg = (SendableGet) req; + FetchContext ctx = sg.getContext(); + if(container != null) + container.activate(ctx, 1); + localRequestOnly = ctx.localRequestOnly; + cacheLocalRequests = ctx.cacheLocalRequests; + ignoreStore = ctx.ignoreStore; } else { - if(key != null && key.getRoutingKey() == null) - throw new NullPointerException(); - ret = new ChosenRequest(req, token, key, ckey, req.getPriorityClass(container), null); + localRequestOnly = false; + cacheLocalRequests = false; + ignoreStore = false; } + ret = new TransientChosenBlock(req, token, key, ckey, localRequestOnly, cacheLocalRequests, ignoreStore, sched); return ret; } } @@ -346,9 +311,9 @@ SendableRequest removeFirstInner(int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, RequestStarter starter, ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, boolean notTransient, short maxPrio, int retryCount, ClientContext context, ObjectContainer container) { // Priorities start at 0 if(logMINOR) Logger.minor(this, "removeFirst()"); - boolean tryOfferedKeys = offeredKeys != null && random.nextBoolean(); + boolean tryOfferedKeys = offeredKeys != null && (!notTransient) && random.nextBoolean(); int choosenPriorityClass = removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, schedTransient, transientOnly, maxPrio, container); - if(choosenPriorityClass == -1 && offeredKeys != null && !tryOfferedKeys) { + if(choosenPriorityClass == -1 && offeredKeys != null && (!tryOfferedKeys)) { tryOfferedKeys = true; choosenPriorityClass = removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, schedTransient, transientOnly, maxPrio, container); } @@ -669,7 +634,7 @@ // Cancel the request, and commit so it isn't tried again. if(reg.getters != null) { for(int k=0;k<reg.getters.length;k++) - reg.getters[k].internalError(null, t, sched, container, context, true); + reg.getters[k].internalError(t, sched, container, context, true); } } if(reg.listener != null) @@ -745,21 +710,6 @@ } } - public void removeChosenRequest(final ChosenRequest req) { - int prio = NativeThread.NORM_PRIORITY+1; - assert(prio < ClientRequestScheduler.TRIP_PENDING_PRIORITY); - if(req != null && req.isPersistent()) { - sched.clientContext.jobRunner.queue(new DBJob() { - public void run(ObjectContainer container, ClientContext context) { - container.delete(req); - } - public String toString() { - return "Delete "+req; - } - }, prio, false); - } - } - protected Set makeSetForAllRequestsByClientRequest(ObjectContainer container) { return new Db4oSet(container, 1); } Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -38,6 +38,7 @@ ClientRequestSchedulerNonPersistent(ClientRequestScheduler sched, boolean forInserts, boolean forSSKs) { super(forInserts, forSSKs, new HashMap(), new LinkedList()); + this.sched = sched; recentSuccesses = new LinkedList(); if(forInserts) pendingKeys = null; Modified: branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -4,6 +4,7 @@ package freenet.client.async; import java.util.HashSet; +import java.util.List; import java.util.Vector; import com.db4o.ObjectContainer; @@ -15,6 +16,7 @@ import freenet.node.NodeClientCore; import freenet.node.RequestClient; import freenet.node.RequestScheduler; +import freenet.node.SendableRequestSender; import freenet.node.NodeClientCore.SimpleRequestSenderCompletionListener; import freenet.support.Logger; @@ -140,24 +142,31 @@ return 0; // All keys have equal chance even if they've been tried before. } - public void internalError(Object keyNum, Throwable t, RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent) { + public void internalError(Throwable t, RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent) { Logger.error(this, "Internal error: "+t, t); } - public boolean send(NodeClientCore node, RequestScheduler sched, ChosenRequest req) { - Key key = (Key) req.token; - // Have to cache it in order to propagate it; FIXME - // Don't let a node force us to start a real request for a specific key. - // We check the datastore, take up offers if any (on a short timeout), and then quit if we still haven't fetched the data. - // Obviously this may have a marginal impact on load but it should only be marginal. - core.asyncGet(key, true, true, new SimpleRequestSenderCompletionListener() { + @Override + public SendableRequestSender getSender(ObjectContainer container, ClientContext context) { + return new SendableRequestSender() { - public void completed(boolean success) { - // Ignore + public boolean send(NodeClientCore core, RequestScheduler sched, ClientContext context, ChosenBlock req) { + Key key = (Key) req.token; + // Have to cache it in order to propagate it; FIXME + // Don't let a node force us to start a real request for a specific key. + // We check the datastore, take up offers if any (on a short timeout), and then quit if we still haven't fetched the data. + // Obviously this may have a marginal impact on load but it should only be marginal. + core.asyncGet(key, true, true, new SimpleRequestSenderCompletionListener() { + + public void completed(boolean success) { + // Ignore + } + + }); + return true; } - }); - return true; + }; } public boolean canRemove(ObjectContainer container) { @@ -185,4 +194,9 @@ return isSSK; } + @Override + public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest request, RequestScheduler sched, ObjectContainer container, ClientContext context) { + throw new UnsupportedOperationException("Transient only"); + } + } Added: branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java (rev 0) +++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -0,0 +1,160 @@ +package freenet.client.async; + +import freenet.keys.ClientKey; +import freenet.keys.Key; +import freenet.node.LowLevelGetException; +import freenet.node.LowLevelPutException; +import freenet.node.NodeClientCore; +import freenet.node.RequestScheduler; +import freenet.node.SendableGet; +import freenet.node.SendableRequestSender; +import freenet.support.Logger; + +/** + * A block within a ChosenRequest. + * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450) + */ +public class PersistentChosenBlock extends ChosenBlock { + + public final PersistentChosenRequest parent; + public final boolean isInsert; + + /* Completion */ + private boolean finished; + + /** If a SendableGet failed, failedGet will be set to the exception generated. Cannot be null if it failed. */ + private LowLevelGetException failedGet; + + private boolean fetchSucceeded; // The actual block is not our problem. + + /* Inserts */ + private boolean insertSucceeded; + /** If a SendableInsert failed, failedPut will be set to the exception generated. Cannot be null if it failed. */ + private LowLevelPutException failedPut; + + public PersistentChosenBlock(boolean isInsert, PersistentChosenRequest parent, Object token, Key key, ClientKey ckey, RequestScheduler sched) { + super(token, key == null ? null : key.cloneKey(), ckey == null ? null : ckey.cloneKey(), parent.localRequestOnly, parent.cacheLocalRequests, parent.ignoreStore, sched); + this.isInsert = isInsert; + this.parent = parent; + } + + @Override + public void onFetchSuccess(ClientContext context) { + assert(!isInsert); + synchronized(this) { + if(finished) { + Logger.error(this, "Already finished in onSuccess() on "+this, new Exception("debug")); + return; + } + finished = true; + fetchSucceeded = true; + } + parent.onFinished(this, context); + parent.scheduler.succeeded((SendableGet)parent.request, this); + } + + @Override + public void onFailure(LowLevelGetException e, ClientContext context) { + assert(!isInsert); + synchronized(this) { + if(finished) { + Logger.error(this, "Already finished in onFailure() on "+this, new Exception("debug")); + return; + } + if(e == null) + throw new NullPointerException(); + failedGet = e; + finished = true; + } + parent.onFinished(this, context); + } + + @Override + public void onInsertSuccess(ClientContext context) { + assert(isInsert); + synchronized(this) { + if(finished) { + Logger.error(this, "Already finished in onSuccess() on "+this, new Exception("debug")); + return; + } + insertSucceeded = true; + finished = true; + } + parent.onFinished(this, context); + } + + @Override + public void onFailure(LowLevelPutException e, ClientContext context) { + assert(isInsert); + synchronized(this) { + if(finished) { + Logger.error(this, "Already finished in onFailure() on "+this, new Exception("debug")); + return; + } + if(e == null) + throw new NullPointerException(); + failedPut = e; + finished = true; + } + parent.onFinished(this, context); + } + + LowLevelGetException failedGet() { + return failedGet; + } + + boolean insertSucceeded() { + return insertSucceeded; + } + + boolean fetchSucceeded() { + return fetchSucceeded; + } + + LowLevelPutException failedPut() { + return failedPut; + } + + @Override + public boolean isPersistent() { + return true; + } + + @Override + public boolean isCancelled() { + // We can't tell without accesing the database, and we can't access the database on the request starter thread. + return false; + } + + @Override + public boolean send(NodeClientCore core, RequestScheduler sched) { + try { + return super.send(core, sched); + } finally { + boolean wasFinished; + synchronized(this) { + wasFinished = finished; + if(!finished) { + finished = true; + if(parent.request instanceof SendableGet) { + Logger.error(this, "SendableGet "+parent.request+" didn't call a callback on "+this); + } + } + } + if(!wasFinished) { + parent.onFinished(this, sched.getContext()); + } + } + } + + @Override + public short getPriority() { + return parent.prio; + } + + @Override + public SendableRequestSender getSender(ClientContext context) { + return parent.sender; + } + +} Deleted: branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -1,55 +0,0 @@ -/* This code is part of Freenet. It is distributed under the GNU General - * Public License, version 2 (or at your option any later version). See - * http://www.gnu.org/ for further details of the GPL. */ -package freenet.client.async; - -import com.db4o.ObjectContainer; - -import freenet.keys.ClientKey; -import freenet.keys.Key; -import freenet.node.SendableRequest; - -/** - * These must be deleted once the request has been executed. See RegisterMe. - * - * When we choose a request on the database thread, if it is persistent, we store it to the database. - * The reason for this is that we may crash before it completes, in which case we don't want to lose - * it, and the process of choosing a request is destructive i.e. it will be removed from the queue - * structures before removeFirst() returns. If we do it this way we get the best of all worlds: - * - We remove the data from the queue, so the request won't be chosen again until it's done, and - * choosing requests won't have to go over and ignore a lot of slots. - * - If we restart, we will restart the persistent requests we were running, and will therefore get - * all the relevant callbacks. - * @author toad - */ -public class PersistentChosenRequest extends ChosenRequest { - - ClientRequestSchedulerCore core; - // A persistent hashCode is helpful for debugging and lets us put PCR's into hash-based maps and sets. - private final int hashCode; - - PersistentChosenRequest(ClientRequestSchedulerCore core, SendableRequest req, Object tok, Key key, ClientKey ckey, short prio, ObjectContainer container) { - super(req, tok, key, ckey, prio, container); - if(tok == null) throw new NullPointerException(); - int hash = core.hashCode() ^ req.hashCode(); - if(key != null) - hash ^= key.hashCode(); - if(ckey != null) - hash ^= ckey.hashCode(); - if(tok != null) - hash ^= tok.hashCode(); - hashCode = hash; - this.core = core; - } - - public int hashCode() { - return hashCode; - } - - public boolean equals(Object o) { - if(!(o instanceof PersistentChosenRequest)) return false; - PersistentChosenRequest req = (PersistentChosenRequest) o; - if(req.core != core) return false; - return super.equals(o); - } -} Modified: branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -6,6 +6,8 @@ import java.io.IOException; import java.lang.ref.SoftReference; import java.net.MalformedURLException; +import java.util.Collections; +import java.util.List; import com.db4o.ObjectContainer; @@ -25,6 +27,7 @@ import freenet.node.RequestClient; import freenet.node.RequestScheduler; import freenet.node.SendableInsert; +import freenet.node.SendableRequestSender; import freenet.support.Logger; import freenet.support.SimpleFieldSet; import freenet.support.api.Bucket; @@ -342,25 +345,32 @@ return finished; } - public boolean send(NodeClientCore core, RequestScheduler sched, ChosenRequest req) { - // Ignore keyNum, key, since we're only sending one block. - try { - if(logMINOR) Logger.minor(this, "Starting request: "+this); - ClientKeyBlock b = (ClientKeyBlock) req.token; - if(b != null) - core.realPut(b, req.cacheLocalRequests); - else { - Logger.error(this, "Asked to send empty block on "+this, new Exception("error")); - return false; + @Override + public SendableRequestSender getSender(ObjectContainer container, ClientContext context) { + return new SendableRequestSender() { + + public boolean send(NodeClientCore core, RequestScheduler sched, ClientContext context, ChosenBlock req) { + // Ignore keyNum, key, since we're only sending one block. + try { + if(logMINOR) Logger.minor(this, "Starting request: "+this); + ClientKeyBlock b = (ClientKeyBlock) req.token; + if(b != null) + core.realPut(b, req.cacheLocalRequests); + else { + Logger.error(this, "Asked to send empty block on "+this, new Exception("error")); + return false; + } + } catch (LowLevelPutException e) { + req.onFailure(e, context); + if(logMINOR) Logger.minor(this, "Request failed: "+this+" for "+e); + return true; + } + if(logMINOR) Logger.minor(this, "Request succeeded: "+this); + req.onInsertSuccess(context); + return true; } - } catch (LowLevelPutException e) { - sched.callFailure((SendableInsert) this, e, req.token, NativeThread.NORM_PRIORITY, req, req.isPersistent()); - if(logMINOR) Logger.minor(this, "Request failed: "+this+" for "+e); - return true; - } - if(logMINOR) Logger.minor(this, "Request succeeded: "+this); - sched.callSuccess(this, req.token, NativeThread.NORM_PRIORITY, req, req.isPersistent()); - return true; + + }; } public RequestClient getClient() { @@ -413,4 +423,10 @@ return getBlock(container, context, false); } + @Override + public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest request, RequestScheduler sched, ObjectContainer container, ClientContext context) { + PersistentChosenBlock block = new PersistentChosenBlock(true, request, getBlock(container, context, false), null, null, sched); + return Collections.singletonList(block); + } + } Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -5,6 +5,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.HashSet; import java.util.Vector; import com.db4o.ObjectContainer; @@ -562,9 +563,11 @@ } int maxTries = blockFetchContext.maxNonSplitfileRetries; RequestScheduler sched = context.getFetchScheduler(false); - boolean set = onNonFatalFailure(e, blockNo, seg, container, context, sched, maxTries); - if(persistent && set) - container.set(this); + SplitFileFetcherSubSegment sub = onNonFatalFailure(e, blockNo, seg, container, context, sched, maxTries); + if(sub != null) { + sub.schedule(container, context, false, false); + if(persistent && sub != seg) container.deactivate(sub, 1); + } } public void onNonFatalFailure(FetchException[] failures, int[] blockNos, SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext context) { @@ -574,17 +577,28 @@ int maxTries = blockFetchContext.maxNonSplitfileRetries; RequestScheduler sched = context.getFetchScheduler(false); boolean set = false; - for(int i=0;i<failures.length;i++) - if(onNonFatalFailure(failures[i], blockNos[i], seg, container, context, sched, maxTries)) - set = true; - if(persistent && set) - container.set(this); + HashSet<SplitFileFetcherSubSegment> toSchedule = null; + for(int i=0;i<failures.length;i++) { + SplitFileFetcherSubSegment sub = + onNonFatalFailure(failures[i], blockNos[i], seg, container, context, sched, maxTries); + if(sub != null) { + if(toSchedule == null) + toSchedule = new HashSet<SplitFileFetcherSubSegment>(); + toSchedule.add(sub); + } + } + if(toSchedule != null && !toSchedule.isEmpty()) { + for(SplitFileFetcherSubSegment sub : toSchedule) { + sub.schedule(container, context, false, false); + if(persistent && sub != seg) container.deactivate(sub, 1); + } + } } /** * Caller must set(this) iff returns true. */ - private boolean onNonFatalFailure(FetchException e, int blockNo, SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext context, RequestScheduler sched, int maxTries) { + private SplitFileFetcherSubSegment onNonFatalFailure(FetchException e, int blockNo, SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext context, RequestScheduler sched, int maxTries) { if(logMINOR) Logger.minor(this, "Calling onNonFatalFailure for block "+blockNo+" on "+this+" from "+seg); int tries; boolean failed = false; @@ -592,7 +606,7 @@ ClientCHK key; SplitFileFetcherSubSegment sub = null; synchronized(this) { - if(isFinished(container)) return false; + if(isFinished(container)) return null; if(blockNo < dataKeys.length) { key = dataKeys[blockNo]; if(persistent) @@ -601,7 +615,7 @@ if(tries > maxTries && maxTries >= 0) failed = true; else { sub = getSubSegment(tries, container, false, seg); - if(tries % ClientRequestScheduler.COOLDOWN_RETRIES == 0) { + if(tries % RequestScheduler.COOLDOWN_RETRIES == 0) { long now = System.currentTimeMillis(); if(dataCooldownTimes[blockNo] > now) Logger.error(this, "Already on the cooldown queue! for "+this+" data block no "+blockNo, new Exception("error")); @@ -619,7 +633,7 @@ if(tries > maxTries && maxTries >= 0) failed = true; else { sub = getSubSegment(tries, container, false, seg); - if(tries % ClientRequestScheduler.COOLDOWN_RETRIES == 0) { + if(tries % RequestScheduler.COOLDOWN_RETRIES == 0) { long now = System.currentTimeMillis(); if(checkCooldownTimes[checkNo] > now) Logger.error(this, "Already on the cooldown queue! for "+this+" check block no "+blockNo, new Exception("error")); @@ -637,23 +651,27 @@ onFatalFailure(e, blockNo, seg, container, context); if(logMINOR) Logger.minor(this, "Not retrying block "+blockNo+" on "+this+" : tries="+tries+"/"+maxTries); - return false; + return null; } + boolean mustSchedule = false; if(cooldown) { // Registered to cooldown queue if(logMINOR) Logger.minor(this, "Added to cooldown queue: "+key+" for "+this+" was on segment "+seg+" now registered to "+sub); } else { // If we are here we are going to retry - sub.add(blockNo, false, container, context, false); + mustSchedule = sub.add(blockNo, true, container, context, false); if(logMINOR) Logger.minor(this, "Retrying block "+blockNo+" on "+this+" : tries="+tries+"/"+maxTries+" : "+sub); } if(persistent) { - if(sub != null && sub != seg) container.deactivate(sub, 1); + container.set(this); container.deactivate(key, 5); } - return true; + if(mustSchedule) + return sub; + else + return null; } private SplitFileFetcherSubSegment getSubSegment(int retryCount, ObjectContainer container, boolean noCreate, SplitFileFetcherSubSegment dontDeactivate) { @@ -743,14 +761,15 @@ if(logMINOR) Logger.minor(this, "scheduling "+seg+" : "+seg.blockNums); - seg.schedule(container, context, true, regmeOnly); - if(persistent) - container.deactivate(seg, 1); synchronized(this) { scheduled = true; } if(persistent) container.set(this); + // Schedule(true) will deactivate me, so we need to do it after storing scheduled. + seg.schedule(container, context, true, regmeOnly); + if(persistent) + container.deactivate(seg, 1); } catch (Throwable t) { Logger.error(this, "Caught "+t+" scheduling "+this, t); fail(new FetchException(FetchException.INTERNAL_ERROR, t), container, context, true); @@ -1106,7 +1125,7 @@ if(persistent) container.activate(seg, 1); if(seg != null) { - seg.removeBlockNum(blockNum, container); + seg.removeBlockNum(blockNum, container, false); seg.possiblyRemoveFromParent(container, context); } for(int i=0;i<subSegments.size();i++) { @@ -1114,7 +1133,7 @@ if(checkSeg == seg) continue; if(persistent) container.activate(checkSeg, 1); - if(checkSeg.removeBlockNum(blockNum, container)) + if(checkSeg.removeBlockNum(blockNum, container, false)) Logger.error(this, "Block number "+blockNum+" was registered to wrong subsegment "+checkSeg+" should be "+seg); if(persistent) container.deactivate(checkSeg, 1); Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -1,6 +1,8 @@ package freenet.client.async; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Vector; import com.db4o.ObjectContainer; @@ -21,6 +23,7 @@ import freenet.node.KeysFetchingLocally; import freenet.node.LowLevelGetException; import freenet.node.RequestClient; +import freenet.node.RequestScheduler; import freenet.node.SendableGet; import freenet.node.SupportsBulkCallFailure; import freenet.support.Logger; @@ -229,11 +232,17 @@ public void onFailure(BulkCallFailureItem[] items, ObjectContainer container, ClientContext context) { FetchException[] fetchExceptions = new FetchException[items.length]; int countFatal = 0; + if(persistent) { + container.activate(blockNums, 2); + } for(int i=0;i<items.length;i++) { fetchExceptions[i] = translateException(items[i].e); if(fetchExceptions[i].isFatal()) countFatal++; + removeBlockNum(((Integer)items[i].token).intValue(), container, true); } if(persistent) { + container.set(blockNums); + container.deactivate(blockNums, 2); container.activate(segment, 1); container.activate(parent, 1); container.activate(segment.errors, 1); @@ -335,6 +344,7 @@ } else { segment.onNonFatalFailure(e, ((Integer)token).intValue(), this, container, context); } + removeBlockNum(((Integer)token).intValue(), container, false); if(persistent) { container.deactivate(segment, 1); container.deactivate(parent, 1); @@ -532,7 +542,10 @@ } - public void add(int blockNo, boolean dontSchedule, ObjectContainer container, ClientContext context, boolean dontComplainOnDupes) { + /** + * @return True if the caller should schedule. + */ + public boolean add(int blockNo, boolean dontSchedule, ObjectContainer container, ClientContext context, boolean dontComplainOnDupes) { if(persistent) { // container.activate(segment, 1); container.activate(blockNums, 1); @@ -554,7 +567,6 @@ } else { blockNums.add(i); } - if(dontSchedule) schedule = false; /** * Race condition: * @@ -575,8 +587,11 @@ } if(persistent) container.set(blockNums); - if(schedule) + if(schedule) { + if(dontSchedule) return true; context.getChkFetchScheduler().register(null, new SendableGet[] { this }, false, persistent, true, null, null); + } + return false; } public String toString() { @@ -729,9 +744,9 @@ getScheduler(context).register(firstTime ? segment : null, new SendableGet[] { this }, regmeOnly, persistent, true, segment.blockFetchContext.blocks, null); } - public boolean removeBlockNum(int blockNum, ObjectContainer container) { + public boolean removeBlockNum(int blockNum, ObjectContainer container, boolean callerActivatesAndSets) { if(logMINOR) Logger.minor(this, "Removing block "+blockNum+" from "+this); - if(persistent) + if(persistent && !callerActivatesAndSets) container.activate(blockNums, 2); boolean found = false; synchronized(segment) { @@ -746,11 +761,41 @@ } } } - if(persistent) { + if(persistent && !callerActivatesAndSets) { container.set(blockNums); container.deactivate(blockNums, 2); } return found; } + @Override + public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest request, RequestScheduler sched, ObjectContainer container, ClientContext context) { + if(persistent) { + container.activate(segment, 1); + container.activate(blockNums, 1); + } + Integer[] blockNumbers; + synchronized(this) { + blockNumbers = (Integer[]) blockNums.toArray(new Integer[blockNums.size()]); + } + ArrayList<PersistentChosenBlock> blocks = new ArrayList<PersistentChosenBlock>(); + for(int i=0;i<blockNumbers.length;i++) { + ClientKey key = segment.getBlockKey(blockNumbers[i], container); + Key k = key.getNodeKey(); + if(key == null) { + if(logMINOR) + Logger.minor(this, "Block "+blockNumbers[i]+" is null, maybe race condition"); + continue; + } + PersistentChosenBlock block = new PersistentChosenBlock(false, request, blockNumbers[i], k, key, sched); + blocks.add(block); + } + blocks.trimToSize(); + if(persistent) { + container.deactivate(segment, 1); + container.deactivate(blockNums, 1); + } + return blocks; + } + } Added: branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java (rev 0) +++ branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -0,0 +1,72 @@ +package freenet.client.async; + +import freenet.keys.ClientKey; +import freenet.keys.ClientKeyBlock; +import freenet.keys.Key; +import freenet.node.LowLevelGetException; +import freenet.node.LowLevelPutException; +import freenet.node.RequestScheduler; +import freenet.node.SendableGet; +import freenet.node.SendableInsert; +import freenet.node.SendableRequest; +import freenet.node.SendableRequestSender; + +/** + * A ChosenBlock which isn't persistent. + * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450) + * + */ +public class TransientChosenBlock extends ChosenBlock { + + public final SendableRequest request; + public final RequestScheduler sched; + + public TransientChosenBlock(SendableRequest req, Object token, Key key, ClientKey ckey, + boolean localRequestOnly, boolean cacheLocalRequests, boolean ignoreStore, RequestScheduler sched) { + super(token, key, ckey, localRequestOnly, cacheLocalRequests, ignoreStore, sched); + this.request = req; + this.sched = sched; + } + + @Override + public boolean isCancelled() { + return request.isCancelled(null); + } + + @Override + public boolean isPersistent() { + return false; + } + + public void onFailure(LowLevelPutException e, ClientContext context) { + ((SendableInsert) request).onFailure(e, token, null, context); + } + + public void onInsertSuccess(ClientContext context) { + ((SendableInsert) request).onSuccess(token, null, context); + } + + public void onFailure(LowLevelGetException e, ClientContext context) { + ((SendableGet) request).onFailure(e, token, null, context); + } + + public void onSuccess(ClientKeyBlock data, boolean fromStore, ClientContext context) { + ((SendableGet) request).onSuccess(data, fromStore, token, null, context); + } + + @Override + public void onFetchSuccess(ClientContext context) { + sched.succeeded((SendableGet)request, this); + } + + @Override + public short getPriority() { + return request.getPriorityClass(null); + } + + @Override + public SendableRequestSender getSender(ClientContext context) { + return request.getSender(null, context); + } + +} Modified: branches/db4o/freenet/src/freenet/keys/ClientCHK.java =================================================================== --- branches/db4o/freenet/src/freenet/keys/ClientCHK.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/keys/ClientCHK.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -40,7 +40,7 @@ private ClientCHK(ClientCHK key) { this.routingKey = new byte[key.routingKey.length]; System.arraycopy(key.routingKey, 0, routingKey, 0, key.routingKey.length); - this.nodeKey = (NodeCHK) key.nodeKey.cloneKey(); + this.nodeKey = null; this.cryptoKey = new byte[key.cryptoKey.length]; System.arraycopy(key.cryptoKey, 0, cryptoKey, 0, key.cryptoKey.length); this.controlDocument = key.controlDocument; Modified: branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java =================================================================== --- branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -1,18 +1,13 @@ package freenet.node; -import freenet.client.async.PersistentChosenRequest; - public class BulkCallFailureItem { public final LowLevelGetException e; public final Object token; - /** Removed by ClientRequestScheduler, implementor of SupportsBulkCallFailure should ignore. */ - public final PersistentChosenRequest req; - public BulkCallFailureItem(LowLevelGetException e, Object token, PersistentChosenRequest req) { + public BulkCallFailureItem(LowLevelGetException e, Object token) { this.e = e; this.token = token; - this.req = req; } } Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java =================================================================== --- branches/db4o/freenet/src/freenet/node/RequestScheduler.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -6,7 +6,7 @@ import java.util.LinkedList; import freenet.client.FECQueue; -import freenet.client.async.ChosenRequest; +import freenet.client.async.ChosenBlock; import freenet.client.async.ClientContext; import freenet.keys.ClientKey; import freenet.keys.Key; @@ -20,7 +20,7 @@ * another may also work. Also, delete the ChosenRequest if it is persistent. * @param req The request we ran, which must be deleted. * */ - public void succeeded(BaseSendableGet get, ChosenRequest req); + public void succeeded(BaseSendableGet get, ChosenBlock req); /** * After a key has been requested a few times, it is added to the cooldown queue for @@ -49,34 +49,26 @@ public void queueFillRequestStarterQueue(); - public LinkedList getRequestStarterQueue(); - - public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req); - public KeysFetchingLocally fetchingKeys(); public void removeFetchingKey(Key key); - public void removeChosenRequest(ChosenRequest req); - - /** Call onFailure() on the database thread, then delete the PersistentChosenRequest. For a non-persistent request, - * just call onFailure() immediately. */ - public void callFailure(final SendableGet get, final LowLevelGetException e, final Object keyNum, int prio, ChosenRequest req, boolean persistent); + public void callFailure(SendableGet get, LowLevelGetException e, int prio, boolean persistent); - /** Call onFailure() on the database thread, then delete the PersistentChosenRequest. For a non-persistent request, - * just call onFailure() immediately. */ - public void callFailure(final SendableInsert put, final LowLevelPutException e, final Object keyNum, int prio, ChosenRequest req, boolean persistent); - - /** Call onSuccess() on the database thread, then delete the PersistentChosenRequest. For a non-persistent request, - * just call onFailure() immediately. */ - public void callSuccess(final SendableInsert put, final Object keyNum, int prio, ChosenRequest req, boolean persistent); - + public void callFailure(SendableInsert insert, LowLevelPutException exception, int prio, boolean persistent); + public FECQueue getFECQueue(); public ClientContext getContext(); public boolean addToFetching(Key key); - public void requeue(ChosenRequest req); - + public ChosenBlock grabRequest(); + + public void removeRunningRequest(SendableRequest request); + + public abstract boolean isRunningRequest(SendableRequest request); + + public void start(NodeClientCore core); + } Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java =================================================================== --- branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -3,11 +3,9 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.node; -import java.util.LinkedList; - import com.db4o.ObjectContainer; -import freenet.client.async.ChosenRequest; +import freenet.client.async.ChosenBlock; import freenet.client.async.ClientContext; import freenet.keys.Key; import freenet.support.Logger; @@ -53,9 +51,6 @@ return !((prio < MAXIMUM_PRIORITY_CLASS) || (prio > MINIMUM_PRIORITY_CLASS)); } - /** Queue of requests to run. Added to by jobs on the database thread. */ - private LinkedList queue; - final BaseRequestThrottle throttle; final TokenBucket inputBucket; final TokenBucket outputBucket; @@ -84,10 +79,10 @@ void setScheduler(RequestScheduler sched) { this.sched = sched; - queue = sched.getRequestStarterQueue(); } void start() { + sched.start(core); core.getExecutor().execute(this, name); sched.queueFillRequestStarterQueue(); } @@ -99,7 +94,7 @@ } void realRun() { - ChosenRequest req = null; + ChosenBlock req = null; sentRequestTime = System.currentTimeMillis(); // The last time at which we sent a request or decided not to long cycleTime = sentRequestTime; @@ -121,10 +116,10 @@ sched.moveKeysFromCooldownQueue(); boolean logMINOR = Logger.shouldLog(Logger.MINOR, this); if(req == null) { - req = getRequest(logMINOR); + req = sched.grabRequest(); } if(req != null) { - if(logMINOR) Logger.minor(this, "Running "+req+" priority "+req.prio); + if(logMINOR) Logger.minor(this, "Running "+req+" priority "+req.getPriority()); // Wait long delay = throttle.getDelay(); if(logMINOR) Logger.minor(this, "Delay="+delay+" from "+throttle); @@ -161,7 +156,7 @@ // Always take the lock on RequestStarter first. AFAICS we don't synchronize on RequestStarter anywhere else. // Nested locks here prevent extra latency when there is a race, and therefore allow us to sleep indefinitely synchronized(this) { - req = getRequest(logMINOR); + req = sched.grabRequest(); if(req == null) { try { wait(100*1000); // as close to indefinite as I'm comfortable with! Toad @@ -174,7 +169,7 @@ if(req == null) continue; if(!startRequest(req, logMINOR)) { // Don't log if it's a cancelled transient request. - if(!((!req.isPersistent()) && req.request.isCancelled(null))) + if(!((!req.isPersistent()) && req.isCancelled())) Logger.normal(this, "No requests to start on "+req); } req = null; @@ -182,64 +177,12 @@ } } - /** - * Pull a request from the from-the-database-thread queue. Then ask for a higher priority non-persistent request. - * If there isn't one, use the one we just pulled; if there is one, put it back on the queue. - * - * Obviously, there will be a slightly higher latency for the database queue; for example, when adding a new - * higher priority persistent request, we have a queue-length penalty before starting to request it. We also - * have a significant penalty from all the database access (even if the request itself is cached, the database - * thread is probably doing other things so we have to wait for that to finish). - * @return - */ - private ChosenRequest getRequest(boolean logMINOR) { - boolean usedReq = true; - ChosenRequest req = null; - while(true) { - synchronized(queue) { - if(queue.isEmpty()) break; - req = (ChosenRequest) queue.removeFirst(); - } - if((!req.isPersistent()) && req.request.isCancelled(null)) continue; - break; - } - ChosenRequest betterReq = sched.getBetterNonPersistentRequest(req); - if(req != null) { - if(betterReq != null) { - if(logMINOR) - Logger.minor(this, "Not using "+req+" in favour of "+betterReq); - synchronized(queue) { - queue.addFirst(req); - queue.remove(betterReq); - } - req = null; - usedReq = false; - } - } - if(req == null) { - usedReq = false; - req = betterReq; - } - if(usedReq || req == null) - sched.queueFillRequestStarterQueue(); - if(req == null && logMINOR) Logger.minor(this, "No requests found"); - if(req != null && !this.isInsert) { - if(!sched.addToFetching(req.key)) { - sched.requeue(req); - Logger.error(this, "Skipping request as duplicate: "+req); - req = null; - } - } - return req; - } - - private boolean startRequest(ChosenRequest req, boolean logMINOR) { - if((!req.isPersistent()) && req.request.isCancelled(null)) { + private boolean startRequest(ChosenBlock req, boolean logMINOR) { + if((!req.isPersistent()) && req.isCancelled()) { sched.removeFetchingKey(req.key); - sched.removeChosenRequest(req); return false; } - if(logMINOR) Logger.minor(this, "Running request "+req+" priority "+req.prio); + if(logMINOR) Logger.minor(this, "Running request "+req+" priority "+req.getPriority()); core.getExecutor().execute(new SenderThread(req, req.key), "RequestStarter$SenderThread for "+req); return true; } @@ -259,10 +202,10 @@ private class SenderThread implements Runnable { - private final ChosenRequest req; + private final ChosenBlock req; private final Key key; - public SenderThread(ChosenRequest req, Key key) { + public SenderThread(ChosenBlock req, Key key) { this.req = req; this.key = key; } @@ -274,18 +217,13 @@ if (key != null) stats.reportOutgoingLocalRequestLocation(key.toNormalizedDouble()); if(!req.send(core, sched)) { - if(!((!req.isPersistent()) && req.request.isCancelled(null))) + if(!((!req.isPersistent()) && req.isCancelled())) Logger.error(this, "run() not able to send a request on "+req); else Logger.normal(this, "run() not able to send a request on "+req+" - request was cancelled"); } if(Logger.shouldLog(Logger.MINOR, this)) Logger.minor(this, "Finished "+req); - } catch (Throwable t) { - // Remove it if something is thrown. - // But normally send(), callFailure() or callSuccess() will remove it. - Logger.error(this, "Caught "+t, t); - sched.removeChosenRequest(req); } finally { sched.removeFetchingKey(key); } @@ -300,6 +238,10 @@ } public boolean exclude(RandomGrabArrayItem item, ObjectContainer container, ClientContext context) { + if(sched.isRunningRequest((SendableRequest)item)) { + Logger.normal(this, "Excluding already-running request: "+item); + return true; + } if(isInsert) return false; if(!(item instanceof BaseSendableGet)) { Logger.error(this, "On a request scheduler, exclude() called with "+item, new Exception("error")); Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java =================================================================== --- branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -6,14 +6,12 @@ import com.db4o.ObjectContainer; import freenet.client.FetchContext; -import freenet.client.async.ChosenRequest; import freenet.client.async.ClientContext; import freenet.client.async.ClientRequestScheduler; import freenet.client.async.ClientRequester; import freenet.keys.ClientKey; import freenet.keys.ClientKeyBlock; import freenet.keys.Key; -import freenet.support.Logger; import freenet.support.io.NativeThread; /** @@ -55,50 +53,12 @@ this.parent = parent; } - /** Do the request, blocking. Called by RequestStarter. - * Also responsible for deleting it. - * @return True if a request was executed. False if caller should try to find another request, and remove - * this one from the queue. */ - public boolean send(NodeClientCore core, final RequestScheduler sched, ChosenRequest req) { - Object keyNum = req.token; - ClientKey key = req.ckey; - if(key == null) { - Logger.error(this, "Key is null in send(): keyNum = "+keyNum+" for "+this); - return false; - } - if(Logger.shouldLog(Logger.MINOR, this)) - Logger.minor(this, "Sending get for key "+keyNum+" : "+key); - boolean logMINOR = Logger.shouldLog(Logger.MINOR, this); - if((!req.isPersistent()) && isCancelled(null)) { - if(logMINOR) Logger.minor(this, "Cancelled: "+this); - sched.callFailure(this, new LowLevelGetException(LowLevelGetException.CANCELLED), keyNum, NativeThread.NORM_PRIORITY+1, req, req.isPersistent()); - return false; - } - try { - try { - core.realGetKey(key, req.localRequestOnly, req.cacheLocalRequests, req.ignoreStore); - } catch (final LowLevelGetException e) { - sched.callFailure(this, e, keyNum, NativeThread.NORM_PRIORITY+1, req, req.isPersistent()); - return true; - } catch (Throwable t) { - Logger.error(this, "Caught "+t, t); - sched.callFailure(this, new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum, NativeThread.NORM_PRIORITY+1, req, req.isPersistent()); - return true; - } - // We must remove the request even in this case. - // On other paths, callFailure() will do the removal. - sched.removeChosenRequest(req); - // Don't call onSuccess(), it will be called for us by backdoor coalescing. - sched.succeeded(this, req); - - } catch (Throwable t) { - Logger.error(this, "Caught "+t, t); - sched.callFailure(this, new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum, NativeThread.NORM_PRIORITY+1, req, req.isPersistent()); - return true; - } - return true; + static final SendableGetRequestSender sender = new SendableGetRequestSender(); + + public SendableRequestSender getSender(ObjectContainer container, ClientContext context) { + return sender; } - + public ClientRequestScheduler getScheduler(ClientContext context) { if(isSSK()) return context.getSskFetchScheduler(); @@ -119,8 +79,11 @@ /** Reset the cooldown times when the request is reregistered. */ public abstract void resetCooldownTimes(ObjectContainer container); - public void internalError(final Object keyNum, final Throwable t, final RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent) { - sched.callFailure(this, new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t), keyNum, NativeThread.MAX_PRIORITY, null, persistent); + /** + * An internal error occurred, effecting this SendableGet, independantly of any ChosenBlock's. + */ + public void internalError(final Throwable t, final RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent) { + sched.callFailure(this, new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t), NativeThread.MAX_PRIORITY, persistent); } /** Added: branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java =================================================================== --- branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java (rev 0) +++ branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -0,0 +1,49 @@ +package freenet.node; + +import freenet.client.async.ChosenBlock; +import freenet.client.async.ClientContext; +import freenet.keys.ClientKey; +import freenet.support.Logger; + +public class SendableGetRequestSender implements SendableRequestSender { + + /** Do the request, blocking. Called by RequestStarter. + * Also responsible for deleting it. + * @return True if a request was executed. False if caller should try to find another request, and remove + * this one from the queue. */ + public boolean send(NodeClientCore core, final RequestScheduler sched, ClientContext context, ChosenBlock req) { + Object keyNum = req.token; + ClientKey key = req.ckey; + if(key == null) { + Logger.error(SendableGet.class, "Key is null in send(): keyNum = "+keyNum+" for "+req); + return false; + } + boolean logMINOR = Logger.shouldLog(Logger.MINOR, this); + if(Logger.shouldLog(Logger.MINOR, SendableGet.class)) + Logger.minor(SendableGet.class, "Sending get for key "+keyNum+" : "+key); + if(req.isCancelled()) { + if(logMINOR) Logger.minor(SendableGet.class, "Cancelled: "+req); + req.onFailure(new LowLevelGetException(LowLevelGetException.CANCELLED), context); + return false; + } + try { + try { + core.realGetKey(key, req.localRequestOnly, req.cacheLocalRequests, req.ignoreStore); + } catch (final LowLevelGetException e) { + req.onFailure(e, context); + return true; + } catch (Throwable t) { + Logger.error(this, "Caught "+t, t); + req.onFailure(new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), context); + return true; + } + req.onFetchSuccess(context); + } catch (Throwable t) { + Logger.error(this, "Caught "+t, t); + req.onFailure(new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), context); + return true; + } + return true; + } + +} Modified: branches/db4o/freenet/src/freenet/node/SendableInsert.java =================================================================== --- branches/db4o/freenet/src/freenet/node/SendableInsert.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/node/SendableInsert.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -6,6 +6,7 @@ import com.db4o.ObjectContainer; import freenet.client.async.ClientContext; +import freenet.support.io.NativeThread; /** * Callback interface for a low level insert, which is immediately sendable. These @@ -25,8 +26,8 @@ /** Called when we don't! */ public abstract void onFailure(LowLevelPutException e, Object keyNum, ObjectContainer container, ClientContext context); - public void internalError(Object keyNum, Throwable t, RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent) { - onFailure(new LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t), keyNum, container, context); + public void internalError(Throwable t, RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent) { + sched.callFailure(this, new LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t), NativeThread.MAX_PRIORITY, persistent); } } Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java =================================================================== --- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -1,11 +1,15 @@ package freenet.node; +import java.util.List; + import com.db4o.ObjectContainer; -import freenet.client.async.ChosenRequest; +import freenet.client.async.ChosenBlock; import freenet.client.async.ClientContext; import freenet.client.async.ClientRequestScheduler; import freenet.client.async.ClientRequester; +import freenet.client.async.PersistentChosenBlock; +import freenet.client.async.PersistentChosenRequest; import freenet.support.Logger; import freenet.support.RandomGrabArray; import freenet.support.RandomGrabArrayItem; @@ -54,18 +58,14 @@ * currently running, on the cooldown queue etc. */ public abstract Object[] sendableKeys(ObjectContainer container); - /** ONLY called by RequestStarter. Start the actual request using the NodeClientCore - * provided, and the key and key number earlier got from chooseKey(). - * The request itself may have been removed from the overall queue already. For - * persistent requests, the callbacks will be called on the database thread, and we - * will delete the PersistentChosenRequest from there before committing. - * @param sched The scheduler this request has just been grabbed from. - * @param keyNum The key number that was fed into getKeyObject(). - * @param key The key returned from grabKey(). - * @param ckey The client key for decoding, if available (mandatory for SendableGet, null otherwise). - * @return True if a request was sent, false otherwise (in which case the request will - * be removed if it hasn't already been). */ - public abstract boolean send(NodeClientCore node, RequestScheduler sched, ChosenRequest request); + /** + * Get or create a SendableRequestSender for this object. This is a non-persistent + * object used to send the requests. @see SendableGet.getSender(). + * @param container A database handle may be necessary for creating it. + * @param context A client context may also be necessary. + * @return + */ + public abstract SendableRequestSender getSender(ObjectContainer container, ClientContext context); /** If true, the request has been cancelled, or has completed, either way it need not * be registered any more. isEmpty() on the other hand means there are no queued blocks. @@ -127,6 +127,10 @@ public abstract boolean isSSK(); /** Requeue after an internal error */ - public abstract void internalError(Object keyNum, Throwable t, RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent); + public abstract void internalError(Throwable t, RequestScheduler sched, ObjectContainer container, ClientContext context, boolean persistent); + /** Construct a full set of ChosenBlock's for a persistent request. These are transient, so we will need to clone keys + * etc. */ + public abstract List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest request, RequestScheduler sched, ObjectContainer container, ClientContext context); + } Added: branches/db4o/freenet/src/freenet/node/SendableRequestSender.java =================================================================== --- branches/db4o/freenet/src/freenet/node/SendableRequestSender.java (rev 0) +++ branches/db4o/freenet/src/freenet/node/SendableRequestSender.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -0,0 +1,26 @@ +package freenet.node; + +import freenet.client.async.ChosenBlock; +import freenet.client.async.ClientContext; + +/** + * Interface for class responsible for doing the actual sending of requests. + * Strictly non-persistent. + * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450) + */ +public interface SendableRequestSender { + + /** ONLY called by RequestStarter. Start the actual request using the NodeClientCore + * provided, and the key and key number earlier got from chooseKey(). + * The request itself may have been removed from the overall queue already. For + * persistent requests, the callbacks will be called on the database thread, and we + * will delete the PersistentChosenRequest from there before committing. + * @param sched The scheduler this request has just been grabbed from. + * @param keyNum The key number that was fed into getKeyObject(). + * @param key The key returned from grabKey(). + * @param ckey The client key for decoding, if available (mandatory for SendableGet, null otherwise). + * @return True if a request was sent, false otherwise (in which case the request will + * be removed if it hasn't already been). */ + public abstract boolean send(NodeClientCore node, RequestScheduler sched, ClientContext context, ChosenBlock request); + +} Modified: branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java =================================================================== --- branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java 2008-08-12 19:25:54 UTC (rev 21772) +++ branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java 2008-08-12 19:42:51 UTC (rev 21773) @@ -3,12 +3,16 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.node; +import java.util.List; + import com.db4o.ObjectContainer; -import freenet.client.async.ChosenRequest; +import freenet.client.async.ChosenBlock; import freenet.client.async.ClientContext; import freenet.client.async.ClientRequestScheduler; import freenet.client.async.ClientRequester; +import freenet.client.async.PersistentChosenBlock; +import freenet.client.async.PersistentChosenRequest; import freenet.keys.CHKBlock; import freenet.keys.ClientKey; import freenet.keys.KeyBlock; @@ -72,22 +76,28 @@ return 0; } - public boolean send(NodeClientCore core, RequestScheduler sched, ChosenRequest req) { - // Ignore keyNum, key, since this is a single block - boolean logMINOR = Logger.shouldLog(Logger.MINOR, this); - try { - if(logMINOR) Logger.minor(this, "Starting request: "+this); - core.realPut(block, shouldCache()); - } catch (LowLevelPutException e) { - sched.callFailure(this, e, req.token, NativeThread.NORM_PRIORITY, req, false); - if(logMINOR) Logger.minor(this, "Request failed: "+this+" for "+e); - return true; - } finally { - finished = true; - } - if(logMINOR) Logger.minor(this, "Request succeeded: "+this); - sched.callSuccess(this, req.token, NativeThread.NORM_PRIORITY, req, false); - return true; + @Override + public SendableRequestSender getSender(ObjectContainer container, ClientContext context) { + return new SendableRequestSender() { + + public boolean send(NodeClientCore core, RequestScheduler sched, ClientContext context, ChosenBlock req) { + // Ignore keyNum, key, since this is a single block + boolean logMINOR = Logger.shouldLog(Logger.MINOR, this); + try { + if(logMINOR) Logger.minor(this, "Starting request: "+this); + core.realPut(block, shouldCache()); + } catch (LowLevelPutException e) { + onFailure(e, req.token, null, context); + if(logMINOR) Logger.minor(this, "Request failed: "+this+" for "+e); + return true; + } finally { + finished = true; + } + if(logMINOR) Logger.minor(this, "Request succeeded: "+this); + onSuccess(req.token, null, context); + return true; + } + }; } public RequestClient getClient() { @@ -146,4 +156,10 @@ public boolean isSSK() { return block instanceof SSKBlock; } + + @Override + public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest request, RequestScheduler sched, ObjectContainer container, ClientContext context) { + // Transient-only so no makeBlocks(). + throw new UnsupportedOperationException(); + } }
