Author: toad
Date: 2009-04-18 23:22:39 +0000 (Sat, 18 Apr 2009)
New Revision: 27023
Modified:
trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
trunk/freenet/src/freenet/client/async/USKFetcher.java
trunk/freenet/src/freenet/client/async/USKManager.java
Log:
Check the next 50 datastore slots before starting actual requests.
Modified: trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
2009-04-18 20:17:47 UTC (rev 27022)
+++ trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
2009-04-18 23:22:39 UTC (rev 27023)
@@ -396,6 +396,7 @@
for(int i=0;i<getters.length;i++) {
SendableGet getter = getters[i];
container.activate(getter, 1);
+ getter.preRegister(container,
clientContext);
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
wereAnyValid = true;
schedCore.innerRegister(getter, random, container, getters);
@@ -422,6 +423,7 @@
if(container.ext().isActive(getter))
Logger.error(this, "ALREADY ACTIVE in delayed finishRegister: "+getter);
container.activate(getter, 1);
+
getter.preRegister(container, clientContext);
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
wereAnyValid =
true;
schedCore.innerRegister(getter, random, container, getters);
@@ -442,8 +444,11 @@
} else {
if(!anyValid) return;
// Register immediately.
- for(int i=0;i<getters.length;i++)
+ for(int i=0;i<getters.length;i++) {
+ getters[i].preRegister(container,
clientContext);
+ if(getters[i].isCancelled(null) ||
getters[i].isEmpty(null)) continue;
schedTransient.innerRegister(getters[i],
random, null, getters);
+ }
starter.wakeUp();
}
}
Modified: trunk/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcher.java 2009-04-18
20:17:47 UTC (rev 27022)
+++ trunk/freenet/src/freenet/client/async/USKFetcher.java 2009-04-18
23:22:39 UTC (rev 27023)
@@ -8,11 +8,13 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Vector;
import com.db4o.ObjectContainer;
import freenet.client.FetchContext;
+import freenet.keys.ClientKey;
import freenet.keys.ClientSSK;
import freenet.keys.ClientSSKBlock;
import freenet.keys.FreenetURI;
@@ -21,8 +23,13 @@
import freenet.keys.KeyDecodeException;
import freenet.keys.SSKBlock;
import freenet.keys.USK;
+import freenet.node.KeysFetchingLocally;
+import freenet.node.LowLevelGetException;
+import freenet.node.RequestClient;
+import freenet.node.RequestScheduler;
import freenet.node.RequestStarter;
import freenet.node.SendableGet;
+import freenet.node.SendableRequestItem;
import freenet.support.LogThresholdCallback;
import freenet.support.Logger;
import freenet.support.api.Bucket;
@@ -118,8 +125,10 @@
/** A list of keys which we are interested in. This a sequence of SSKs
starting
* at the last known slot. */
private final ArrayList<ClientSSK> keysWatching;
+ private long checkedDatastoreUpTo;
+ private final ArrayList<USKAttempt> attemptsToStart;
- private static final int WATCH_KEYS = 500;
+ private static final int WATCH_KEYS = 50;
/**
* Callbacks are told when the USKFetcher finishes, and unless
background poll is
@@ -269,6 +278,7 @@
this.backgroundPoll = pollForever;
this.keepLastData = keepLastData;
keysWatching = new ArrayList<ClientSSK>();
+ attemptsToStart = new ArrayList<USKAttempt>();
}
void onDNF(USKAttempt att, ClientContext context) {
@@ -354,7 +364,6 @@
}
void onSuccess(USKAttempt att, boolean dontUpdate, ClientSSKBlock
block, final ClientContext context) {
- LinkedList<USKAttempt> l = null;
final long lastEd = uskManager.lookupLatestSlot(origUSK);
long curLatest;
boolean decode = false;
@@ -364,19 +373,18 @@
if(completed || cancelled) return;
decode = curLatest >= lastEd && !(dontUpdate && block
== null);
curLatest = Math.max(lastEd, curLatest);
- fillKeysWatching(curLatest);
if(logMINOR) Logger.minor(this, "Latest: "+curLatest);
long addTo = curLatest + minFailures;
long addFrom = Math.max(lastAddedEdition + 1, curLatest
+ 1);
if(logMINOR) Logger.minor(this, "Adding from
"+addFrom+" to "+addTo+" for "+origUSK);
if(addTo >= addFrom) {
- l = new LinkedList<USKAttempt>();
for(long i=addFrom;i<=addTo;i++) {
if(logMINOR) Logger.minor(this, "Adding
checker for edition "+i+" for "+origUSK);
- l.add(add(i));
+ attemptsToStart.add(add(i));
}
}
cancelBefore(curLatest, context);
+ fillKeysWatching(curLatest+1, context);
}
Bucket data = null;
if(decode) {
@@ -403,29 +411,6 @@
}
if(!dontUpdate)
uskManager.updateSlot(origUSK, curLatest, context);
- if(l == null) return;
- final LinkedList<USKAttempt> toSched = l;
- // If we schedule them here, we don't get icky recursion
problems.
- if(!cancelled) {
- context.mainExecutor.execute(new Runnable() {
- public void run() {
- long last = lastEd;
- for(Iterator<USKAttempt>
i=toSched.iterator();i.hasNext();) {
- // We may be called recursively
through onSuccess().
- // So don't start obsolete
requests.
- USKAttempt a = i.next();
- last =
uskManager.lookupLatestSlot(origUSK);
- if((last <= a.number) &&
!a.cancelled)
- a.schedule(null,
context);
- else {
- synchronized(this) {
-
runningAttempts.remove(a);
- }
- }
- }
- }
- }, "USK scheduler"); // Run on separate thread because
otherwise can get loooong recursions
- }
}
void onCancelled(USKAttempt att, ClientContext context) {
@@ -534,29 +519,13 @@
long lookedUp = uskManager.lookupLatestSlot(origUSK);
synchronized(this) {
valueAtSchedule = Math.max(lookedUp, valueAtSchedule);
- fillKeysWatching(valueAtSchedule);
if(cancelled) return;
long startPoint = Math.max(origUSK.suggestedEdition,
valueAtSchedule);
for(long i=startPoint;i<startPoint+minFailures;i++)
- add(i);
- attempts = runningAttempts.toArray(new
USKAttempt[runningAttempts.size()]);
+ attemptsToStart.add(add(i));
started = true;
+ fillKeysWatching(valueAtSchedule, context);
}
- if(!cancelled) {
- for(int i=0;i<attempts.length;i++) {
- // Race conditions happen here and waste a lot
more time than this simple check.
- long lastEd =
uskManager.lookupLatestSlot(origUSK);
- if(keepLastData && lastEd == lookedUp)
- lastEd--; // If we want the data, then
get it for the known edition, so we always get the data, so USKInserter can
compare it and return the old edition if it is identical.
- if(attempts[i].number > lastEd)
- attempts[i].schedule(container,
context);
- else {
- synchronized(this) {
-
runningAttempts.remove(attempts[i]);
- }
- }
- }
- }
}
public void cancel(ObjectContainer container, ClientContext context) {
@@ -695,26 +664,24 @@
public void onFoundEdition(long ed, USK key, ObjectContainer container,
final ClientContext context, boolean metadata, short codec, byte[] data,
boolean newKnownGood, boolean newSlotToo) {
if(newKnownGood && !newSlotToo) return; // Only interested in
slots
- LinkedList<USKAttempt> l = null;
final long lastEd = uskManager.lookupLatestSlot(origUSK);
boolean decode = false;
synchronized(this) {
if(completed || cancelled) return;
decode = lastEd >= ed && data != null;
ed = Math.max(lastEd, ed);
- fillKeysWatching(ed);
if(logMINOR) Logger.minor(this, "Latest: "+ed);
long addTo = ed + minFailures;
long addFrom = Math.max(lastAddedEdition + 1, ed + 1);
if(logMINOR) Logger.minor(this, "Adding from
"+addFrom+" to "+addTo+" for "+origUSK);
if(addTo >= addFrom) {
- l = new LinkedList<USKAttempt>();
for(long i=addFrom;i<=addTo;i++) {
if(logMINOR) Logger.minor(this, "Adding
checker for edition "+i+" for "+origUSK);
- l.add(add(i));
+ attemptsToStart.add(add(i));
}
}
cancelBefore(ed, context);
+ fillKeysWatching(ed+1, context);
}
synchronized(this) {
if (decode) {
@@ -732,32 +699,12 @@
}
}
}
- if(l == null) return;
- final LinkedList<USKAttempt> toSched = l;
- // If we schedule them here, we don't get icky recursion
problems.
- if(!cancelled) {
- context.mainExecutor.execute(new Runnable() {
- public void run() {
- long last = lastEd;
- for(Iterator<USKAttempt>
i=toSched.iterator();i.hasNext();) {
- // We may be called recursively
through onSuccess().
- // So don't start obsolete
requests.
- USKAttempt a = i.next();
- last =
uskManager.lookupLatestSlot(origUSK);
- if((last <= a.number) &&
!a.cancelled)
- a.schedule(null,
context);
- else {
- synchronized(this) {
-
runningAttempts.remove(a);
- }
- }
- }
- }
- }, "USK scheduler"); // Run on separate thread because
otherwise can get loooong recursions
- }
}
- private synchronized void fillKeysWatching(long ed) {
+ private boolean runningStoreChecker = false;
+
+ private synchronized void fillKeysWatching(long ed, ClientContext
context) {
+ if(logMINOR) Logger.minor(this, "fillKeysWatching from "+ed+"
for "+this+" : "+origUSK, new Exception("debug"));
// if(firstKeyWatching == -1) {
// firstKeyWatching = ed;
// for(int i=0;i<WATCH_KEYS;i++) {
@@ -785,6 +732,171 @@
// }
firstKeyWatching = ed;
// }
+ if(runningStoreChecker) return;
+ long firstCheck = Math.max(firstKeyWatching,
checkedDatastoreUpTo);
+ final long lastCheck = firstKeyWatching + keysWatching.size() -
1;
+ if(lastCheck < firstCheck) return;
+ int checkCount = (int) (lastCheck - firstCheck + 1);
+ int offset = (int) (firstCheck - firstKeyWatching);
+ final Key[] checkStore = new Key[checkCount];
+ for(int i=0;i<checkStore.length;i++) {
+ checkStore[i] = keysWatching.get(i+offset).getNodeKey();
+ }
+ if(logMINOR) Logger.minor(this, "Checking from "+firstCheck+"
to "+lastCheck+" for "+this+" : "+origUSK);
+ SendableGet storeChecker = new SendableGet(parent) {
+
+ boolean done = false;
+
+ @Override
+ public boolean dontCache(ObjectContainer container) {
+ return false;
+ }
+
+ @Override
+ public FetchContext getContext() {
+ return this.getContext();
+ }
+
+ @Override
+ public long getCooldownWakeup(Object token,
ObjectContainer container) {
+ return -1;
+ }
+
+ @Override
+ public long getCooldownWakeupByKey(Key key,
ObjectContainer container) {
+ return -1;
+ }
+
+ @Override
+ public ClientKey getKey(Object token, ObjectContainer
container) {
+ return null;
+ }
+
+ @Override
+ public boolean ignoreStore() {
+ return false;
+ }
+
+ @Override
+ public Key[] listKeys(ObjectContainer container) {
+ return checkStore;
+ }
+
+ @Override
+ public void onFailure(LowLevelGetException e, Object
token, ObjectContainer container, ClientContext context) {
+ // Ignore
+ }
+
+ @Override
+ public void requeueAfterCooldown(Key key, long time,
ObjectContainer container, ClientContext context) {
+ // Ignore
+ }
+
+ @Override
+ public void resetCooldownTimes(ObjectContainer
container) {
+ // Ignore
+ }
+
+ @Override
+ public boolean hasValidKeys(KeysFetchingLocally
fetching, ObjectContainer container, ClientContext context) {
+ return true;
+ }
+
+ @Override
+ public void preRegister(ObjectContainer container,
ClientContext context) {
+ unregister(container, context);
+ USKAttempt[] attempts;
+ synchronized(USKFetcher.this) {
+ if(cancelled) return;
+ runningStoreChecker = false;
+ attempts = attemptsToStart.toArray(new
USKAttempt[attemptsToStart.size()]);
+ attemptsToStart.clear();
+ done = true;
+ }
+ if(logMINOR) Logger.minor(this, "Checked
datastore, finishing registration for "+attempts.length+" checkers for
"+USKFetcher.this+" for "+origUSK);
+ for(int i=0;i<attempts.length;i++) {
+ long lastEd =
uskManager.lookupLatestSlot(origUSK);
+ // FIXME not sure this
condition works, test it!
+ if(keepLastData &&
lastRequestData == null && lastEd == origUSK.suggestedEdition)
+ lastEd--; // If we want
the data, then get it for the known edition, so we always get the data, so
USKInserter can compare it and return the old edition if it is identical.
+ if(attempts[i].number > lastEd)
+
attempts[i].schedule(container, context);
+ else {
+ synchronized(this) {
+
runningAttempts.remove(attempts[i]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public SendableRequestItem
chooseKey(KeysFetchingLocally keys, ObjectContainer container, ClientContext
context) {
+ return null;
+ }
+
+ @Override
+ public long countAllKeys(ObjectContainer container,
ClientContext context) {
+ return keysWatching.size();
+ }
+
+ @Override
+ public long countSendableKeys(ObjectContainer
container, ClientContext context) {
+ return 0;
+ }
+
+ @Override
+ public RequestClient getClient(ObjectContainer
container) {
+ synchronized(USKFetcher.this) {
+ checkedDatastoreUpTo = lastCheck;
+ }
+ return USKFetcher.this.uskManager;
+ }
+
+ @Override
+ public ClientRequester getClientRequest() {
+ return parent;
+ }
+
+ @Override
+ public short getPriorityClass(ObjectContainer
container) {
+ return progressPollPriority; // FIXME
+ }
+
+ @Override
+ public int getRetryCount() {
+ return 0;
+ }
+
+ @Override
+ public boolean isCancelled(ObjectContainer container) {
+ return done;
+ }
+
+ @Override
+ public boolean isSSK() {
+ return true;
+ }
+
+ @Override
+ public List<PersistentChosenBlock>
makeBlocks(PersistentChosenRequest request, RequestScheduler sched,
ObjectContainer container, ClientContext context) {
+ return null;
+ }
+
+ public boolean isEmpty(ObjectContainer container) {
+ return done;
+ }
+
+ };
+ runningStoreChecker = true;
+ try {
+ context.getSskFetchScheduler().register(null, new
SendableGet[] { storeChecker } , false, false, null, null, false);
+ } catch (KeyListenerConstructionException e1) {
+ // Impossible
+ runningStoreChecker = false;
+ } catch (Throwable t) {
+ runningStoreChecker = false;
+ Logger.error(this, "Unable to start: "+t, t);
+ }
}
public synchronized boolean isCancelled(ObjectContainer container) {
Modified: trunk/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKManager.java 2009-04-18
20:17:47 UTC (rev 27022)
+++ trunk/freenet/src/freenet/client/async/USKManager.java 2009-04-18
23:22:39 UTC (rev 27023)
@@ -198,7 +198,7 @@
public void onFoundEdition(long l, USK
key, ObjectContainer container, ClientContext context, boolean metadata, short
codec, byte[] data, boolean newKnownGood, boolean newSlotToo) {
if(l <= min) return;
FreenetURI uri =
key.copy(l).getURI();
- final ClientGetter get = new
ClientGetter(new NullClientCallback(), uri, new FetchContext(fctx,
FetchContext.IDENTICAL_MASK, false, null),
RequestStarter.UPDATE_PRIORITY_CLASS, USKManager.this, new NullBucket(), null);
+ final ClientGetter get = new
ClientGetter(new NullClientCallback(), uri, new FetchContext(fctx,
FetchContext.IDENTICAL_MASK, false, null),
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS, USKManager.this, new
NullBucket(), null);
try {
get.start(null,
context);
} catch (FetchException e) {
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs