Author: toad
Date: 2008-08-20 00:24:11 +0000 (Wed, 20 Aug 2008)
New Revision: 22046
Added:
branches/db4o/freenet/src/freenet/support/BinaryBloomFilter.java
branches/db4o/freenet/src/freenet/support/BloomFilter.java
branches/db4o/freenet/src/freenet/support/CountingBloomFilter.java
branches/db4o/freenet/src/freenet/support/NullBloomFilter.java
Removed:
branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/ClientContext.java
branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
branches/db4o/freenet/src/freenet/client/async/USKChecker.java
branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
branches/db4o/freenet/src/freenet/client/async/USKInserter.java
branches/db4o/freenet/src/freenet/client/async/USKManager.java
branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
branches/db4o/freenet/src/freenet/clients/http/StatisticsToadlet.java
branches/db4o/freenet/src/freenet/node/LowLevelGetException.java
branches/db4o/freenet/src/freenet/node/Node.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
branches/db4o/freenet/src/freenet/node/SendableGet.java
branches/db4o/freenet/src/freenet/node/Version.java
branches/db4o/freenet/src/freenet/support/io/FilenameGenerator.java
Log:
Major optimisation: Bloom filters.
Partially works: activelinks load, freesites don't, persistent requests don't
work either so far.
Will debug more.
Version upped to 1158, but merging isn't that urgent as I skipped the build
(1156) which introduced the Big Bug that 1158 fixes.
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -20,7 +20,7 @@
import freenet.node.SendableGet;
import freenet.support.Logger;
-public abstract class BaseSingleFileFetcher extends SendableGet implements
GotKeyListener {
+public abstract class BaseSingleFileFetcher extends SendableGet implements
HasKeyListener {
final ClientKey key;
protected boolean cancelled;
@@ -109,7 +109,7 @@
}
return true; // We will retry, just not yet.
See requeueAfterCooldown(Key).
} else {
- schedule(container, context, false);
+ reschedule(container, context);
}
return true;
}
@@ -151,7 +151,7 @@
* Call unregister(container) if you only want to remove from the queue.
*/
public void unregisterAll(ObjectContainer container, ClientContext
context) {
- getScheduler(context).removePendingKey(this, false,
key.getNodeKey(), container);
+ getScheduler(context).removePendingKeys(this, false);
super.unregister(container, context);
}
@@ -238,15 +238,27 @@
}
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+"
for "+this);
- schedule(container, context, false);
+ reschedule(container, context);
if(persistent)
container.deactivate(this.key, 5);
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean delayed) {
- getScheduler(context).register(this, new SendableGet[] { this
}, delayed, persistent, true, ctx.blocks, null);
+ public void schedule(ObjectContainer container, ClientContext context) {
+ try {
+ getScheduler(context).register(this, new SendableGet[]
{ this }, persistent, true, ctx.blocks, false);
+ } catch (KeyListenerConstructionException e) {
+ Logger.error(this, "Impossible: "+e+" on "+this, e);
+ }
}
+ public void reschedule(ObjectContainer container, ClientContext
context) {
+ try {
+ getScheduler(context).register(null, new SendableGet[]
{ this }, persistent, true, ctx.blocks, true);
+ } catch (KeyListenerConstructionException e) {
+ Logger.error(this, "Impossible: "+e+" on "+this, e);
+ }
+ }
+
public SendableGet getRequest(Key key, ObjectContainer container) {
return this;
}
@@ -270,4 +282,19 @@
return Collections.singletonList(block);
}
+ public KeyListener makeKeyListener(ObjectContainer container,
ClientContext context) {
+ if(persistent) {
+ container.activate(key, 5);
+ container.activate(parent, 1);
+ container.activate(ctx, 1);
+ }
+ KeyListener ret = new
SingleKeyListener(key.getNodeKey().cloneKey(), this, !ctx.cacheLocalRequests,
parent.getPriorityClass(), persistent);
+ if(persistent) {
+ container.deactivate(key, 5);
+ container.deactivate(parent, 1);
+ container.deactivate(ctx, 1);
+ }
+ return ret;
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientContext.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientContext.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/ClientContext.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -18,6 +18,7 @@
import freenet.node.Ticker;
import freenet.support.Executor;
import freenet.support.api.BucketFactory;
+import freenet.support.io.FilenameGenerator;
import freenet.support.io.NativeThread;
/**
@@ -45,6 +46,7 @@
public transient final Random fastWeakRandom;
public transient final long bootID;
public transient final Ticker ticker;
+ public transient final FilenameGenerator fg;
public ClientContext(NodeClientCore core) {
this.bootID = core.node.bootID;
@@ -61,6 +63,7 @@
this.uskManager = core.uskManager;
fastWeakRandom = core.node.fastWeakRandom;
this.ticker = core.getTicker();
+ fg = core.tempFilenameGenerator;
}
public void init(RequestStarterGroup starters) {
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -11,7 +11,7 @@
*/
public interface ClientGetState {
- public void schedule(ObjectContainer container, ClientContext context,
boolean delayedRegister);
+ public void schedule(ObjectContainer container, ClientContext context)
throws KeyListenerConstructionException;
public void cancel(ObjectContainer container, ClientContext context);
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -114,11 +114,13 @@
return false;
}
}
- currentState.schedule(container, context,
false);
+ currentState.schedule(container, context);
}
if(cancelled) cancel();
} catch (MalformedURLException e) {
throw new FetchException(FetchException.INVALID_URI, e);
+ } catch (KeyListenerConstructionException e) {
+ onFailure(e.getFetchException(), currentState,
container, context);
}
if(persistent()) {
container.set(this);
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -3,6 +3,7 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -10,10 +11,12 @@
import com.db4o.ObjectContainer;
import freenet.client.FECQueue;
+import freenet.client.FetchException;
import freenet.config.EnumerableOptionCallback;
import freenet.config.InvalidConfigValueException;
import freenet.config.SubConfig;
import freenet.crypt.RandomSource;
+import freenet.crypt.SHA256;
import freenet.keys.ClientKey;
import freenet.keys.Key;
import freenet.keys.KeyBlock;
@@ -100,7 +103,7 @@
private final CooldownQueue transientCooldownQueue;
private final CooldownQueue persistentCooldownQueue;
final PrioritizedSerialExecutor databaseExecutor;
- final PrioritizedSerialExecutor datastoreCheckerExecutor;
+ final DatastoreChecker datastoreChecker;
public final ClientContext clientContext;
final DBJobRunner jobRunner;
@@ -117,7 +120,7 @@
schedTransient = new ClientRequestSchedulerNonPersistent(this,
forInserts, forSSKs);
persistentCooldownQueue = schedCore.persistentCooldownQueue;
this.databaseExecutor = core.clientDatabaseExecutor;
- this.datastoreCheckerExecutor = core.datastoreCheckerExecutor;
+ this.datastoreChecker = core.storeChecker;
this.starter = starter;
this.random = random;
this.node = node;
@@ -174,7 +177,7 @@
boolean queueFull =
jobRunner.getQueueSize(NativeThread.NORM_PRIORITY) >= QUEUE_THRESHOLD;
if(!queueFull)
bootID = this.node.bootID;
- final RegisterMe regme = new
RegisterMe(null, null, req, req.getPriorityClass(selectorContainer), schedCore,
null, bootID);
+ final RegisterMe regme = new
RegisterMe(req, req.getPriorityClass(selectorContainer), schedCore, null,
bootID);
selectorContainer.set(regme);
if(logMINOR)
Logger.minor(this, "Added
insert RegisterMe: "+regme);
@@ -217,40 +220,41 @@
* @param listener Listeners for specific keys. Can be null if the
listener
* is already registered e.g. most of the time with SplitFileFetcher*.
* @param getters The actual requests to register to the request sender
queue.
- * @param registerOffThread If true, create and store a RegisterMe to
ensure
- * that the request is registered, but then schedule a job to complete
it
- * after this job completes. Reduces the latency impact of scheduling a
big
- * splitfile dramatically.
* @param persistent True if the request is persistent.
* @param onDatabaseThread True if we are running on the database
thread.
* NOTE: delayedStoreCheck/probablyNotInStore is unnecessary because we
only
* register the listener once.
+ * @throws FetchException
*/
- public void register(final GotKeyListener listener, final SendableGet[]
getters, boolean registerOffThread, final boolean persistent, boolean
onDatabaseThread, final BlockSet blocks, final RegisterMe oldReg) {
+ public void register(final HasKeyListener hasListener, final
SendableGet[] getters, final boolean persistent, boolean onDatabaseThread,
final BlockSet blocks, final boolean noCheckStore) throws
KeyListenerConstructionException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR)
- Logger.minor(this,
"register("+persistent+","+listener+","+getters+","+registerOffThread);
+ Logger.minor(this,
"register("+persistent+","+hasListener+","+getters);
if(isInsertScheduler) {
IllegalStateException e = new
IllegalStateException("finishRegister on an insert scheduler");
throw e;
}
if(persistent) {
if(onDatabaseThread) {
- innerRegister(listener, getters,
registerOffThread, persistent, blocks, oldReg);
+ innerRegister(hasListener, getters, blocks,
noCheckStore);
} else {
jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
// registerOffThread would be
pointless because this is a separate job.
- if(listener != null)
-
container.activate(listener, 1);
+ if(hasListener != null)
+
container.activate(hasListener, 1);
if(getters != null) {
for(int
i=0;i<getters.length;i++)
container.activate(getters[i], 1);
}
- innerRegister(listener,
getters, false, persistent, blocks, oldReg);
- if(listener != null)
-
container.deactivate(listener, 1);
+ try {
+
innerRegister(hasListener, getters, blocks, noCheckStore);
+ } catch
(KeyListenerConstructionException e) {
+ Logger.error(this,
"Registration failed to create Bloom filters: "+e+" on "+hasListener, e);
+ }
+ if(hasListener != null)
+
container.deactivate(hasListener, 1);
if(getters != null) {
for(int
i=0;i<getters.length;i++)
container.deactivate(getters[i], 1);
@@ -260,163 +264,63 @@
}, NativeThread.NORM_PRIORITY, false);
}
} else {
- if(listener != null) {
- final Key[] keys = listener.listKeys(null);
- schedTransient.addPendingKeys(listener, keys,
null);
- short prio = listener.getPriorityClass(null);
- final boolean dontCache =
listener.dontCache(null);
- for(int i=0;i<keys.length;i++) {
- if(keys[i].getRoutingKey() == null)
- throw new
NullPointerException();
- }
- datastoreCheckerExecutor.execute(new Runnable()
{
-
- public void run() {
- // Check the store, then queue
the requests to the main queue.
- registerCheckStore(getters,
false, keys, null, blocks, dontCache);
- }
-
- }, prio, "Checking datastore");
+ final KeyListener listener;
+ if(hasListener != null) {
+ listener =
hasListener.makeKeyListener(selectorContainer, clientContext);
+ schedTransient.addPendingKeys(listener);
+ } else
+ listener = null;
+ if(getters != null && !noCheckStore) {
+ for(SendableGet getter : getters)
+
datastoreChecker.queueTransientRequest(getter, blocks);
} else {
- this.finishRegister(getters, persistent, false,
true, null);
+ boolean anyValid = false;
+ for(int i=0;i<getters.length;i++) {
+ if(!(getters[i].isCancelled(null) ||
getters[i].isEmpty(null)))
+ anyValid = true;
+ }
+ finishRegister(getters, false,
onDatabaseThread, anyValid, null);
}
}
}
- private void innerRegister(final GotKeyListener listener, final
SendableGet[] getters, boolean registerOffThread, boolean persistent, final
BlockSet blocks, RegisterMe reg) {
+ private void innerRegister(final HasKeyListener hasListener, final
SendableGet[] getters, final BlockSet blocks, boolean noCheckStore) throws
KeyListenerConstructionException {
+ final KeyListener listener;
+ if(hasListener != null) {
+ listener =
hasListener.makeKeyListener(selectorContainer, clientContext);
+ schedCore.addPendingKeys(listener);
+ selectorContainer.set(hasListener);
+ } else
+ listener = null;
+
if(isInsertScheduler) {
IllegalStateException e = new
IllegalStateException("finishRegister on an insert scheduler");
throw e;
}
- if(listener != null) {
- if(registerOffThread) {
- short prio =
listener.getPriorityClass(selectorContainer);
- boolean queueFull = false;
- if(reg == null) {
- long bootID = 0;
- queueFull =
jobRunner.getQueueSize(NativeThread.NORM_PRIORITY) >= QUEUE_THRESHOLD;
- if(!queueFull)
- bootID = this.node.bootID;
-
- reg = new RegisterMe(listener, getters,
null, prio, schedCore, blocks, bootID);
- selectorContainer.set(reg);
- }
- final RegisterMe regme = reg;
- if(logMINOR) Logger.minor(this, "Added regme:
"+regme);
- if(!queueFull) {
- jobRunner.queue(new DBJob() {
-
- public void run(ObjectContainer
container, ClientContext context) {
- if(listener != null)
-
container.activate(listener, 1);
- if(getters != null) {
- for(int
i=0;i<getters.length;i++)
-
container.activate(getters[i], 1);
- }
- register(listener, getters,
false, true, true, blocks, regme);
- if(listener != null)
-
container.deactivate(listener, 1);
- if(getters != null) {
- for(int
i=0;i<getters.length;i++)
-
container.deactivate(getters[i], 1);
- }
- }
-
- }, NativeThread.NORM_PRIORITY, false);
- } else {
-
schedCore.rerunRegisterMeRunner(jobRunner);
- }
- return;
- } else {
- short prio =
listener.getPriorityClass(selectorContainer);
- final Key[] keys =
listener.listKeys(selectorContainer);
- for(int i=0;i<keys.length;i++) {
- selectorContainer.activate(keys[i], 5);
- if(keys[i].getRoutingKey() == null)
- throw new
NullPointerException();
- }
- schedCore.addPendingKeys(listener, keys,
selectorContainer);
- if(reg == null && getters != null) {
- reg = new RegisterMe(null, getters,
null, prio, schedCore, blocks, node.bootID);
- selectorContainer.set(reg);
- if(logMINOR) Logger.minor(this, "Added
regme: "+reg);
- } else {
- if(reg != null)
- selectorContainer.delete(reg);
- reg = null; // Nothing to finish
registering.
- }
- final RegisterMe regme = reg;
- // Check the datastore before proceding.
- for(int i=0;i<keys.length;i++) {
- Key oldKey = keys[i];
- keys[i] = oldKey.cloneKey();
- selectorContainer.deactivate(oldKey, 5);
- }
- final boolean dontCache =
listener.dontCache(selectorContainer);
- datastoreCheckerExecutor.execute(new Runnable()
{
-
- public void run() {
- // Check the store, then queue
the requests to the main queue.
- registerCheckStore(getters,
true, keys, regme, blocks, dontCache);
- }
-
- }, prio, "Checking datastore");
- selectorContainer.deactivate(listener, 1);
- if(getters != null) {
- for(int i=0;i<getters.length;i++)
-
selectorContainer.deactivate(getters[i], 1);
- }
-
+ if(!noCheckStore) {
+ // Check the datastore before proceding.
+ for(SendableGet getter : getters)
+ datastoreChecker.queuePersistentRequest(getter,
blocks, selectorContainer);
+ selectorContainer.deactivate(listener, 1);
+ if(getters != null) {
+ for(int i=0;i<getters.length;i++)
+
selectorContainer.deactivate(getters[i], 1);
}
} else {
- // The listener is already registered.
- // Ignore registerOffThread for now.
+ // We have already checked the datastore, this is a
retry, the listener hasn't been unregistered.
short prio = RequestStarter.MINIMUM_PRIORITY_CLASS;
for(int i=0;i<getters.length;i++) {
short p =
getters[i].getPriorityClass(selectorContainer);
if(p < prio) prio = p;
}
- this.finishRegister(getters, persistent, true, true,
null);
+ this.finishRegister(getters, true, true, true, null);
}
}
- protected void registerCheckStore(SendableGet[] getters, boolean
persistent,
- Key[] keys, RegisterMe regme, BlockSet extraBlocks,
boolean dontCache) {
+ void finishRegister(final SendableGet[] getters, boolean persistent,
boolean onDatabaseThread, final boolean anyValid, final DatastoreCheckerItem
reg) {
if(isInsertScheduler && getters != null) {
IllegalStateException e = new
IllegalStateException("finishRegister on an insert scheduler");
- throw e;
- }
- boolean anyValid = false;
- for(int i=0;i<keys.length;i++) {
- Key key = keys[i];
- KeyBlock block = null;
- if(key == null) {
- if(logMINOR) Logger.minor(this, "No key at "+i);
- continue;
- } else {
- if(extraBlocks != null)
- block = extraBlocks.get(key);
- if(block == null)
- block = node.fetch(key, dontCache);
- if(block != null) {
- if(logMINOR)
- Logger.minor(this, "Got
"+block);
- }
- }
- if(block != null) {
- if(logMINOR) Logger.minor(this, "Found key");
- tripPendingKey(block);
- } else {
- anyValid = true;
- }
- }
- finishRegister(getters, persistent, false, anyValid, regme);
- }
-
- private void finishRegister(final SendableGet[] getters, boolean
persistent, boolean onDatabaseThread, final boolean anyValid, final RegisterMe
reg) {
- if(isInsertScheduler && getters != null) {
- IllegalStateException e = new
IllegalStateException("finishRegister on an insert scheduler");
if(onDatabaseThread || !persistent) {
for(int i=0;i<getters.length;i++) {
if(persistent)
@@ -501,16 +405,6 @@
requestStarterQueueFiller.run(container, context);
}
- void addPendingKey(final ClientKey key, final GotKeyListener getter) {
- if(getter.persistent()) {
- if(!databaseExecutor.onThread()) {
- throw new IllegalStateException("Not on
database thread!");
- }
- schedCore.addPendingKey(key.getNodeKey(), getter,
selectorContainer);
- } else
- schedTransient.addPendingKey(key.getNodeKey(), getter,
null);
- }
-
public ChosenBlock getBetterNonPersistentRequest(short prio, int
retryCount) {
short fuzz = -1;
if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
@@ -788,65 +682,30 @@
}
}
- public void removePendingKey(final GotKeyListener getter, final boolean
complain, final Key key, ObjectContainer container) {
- if(!getter.persistent()) {
- boolean dropped =
schedTransient.removePendingKey(getter, complain, key, container);
- if(dropped && offeredKeys != null &&
!node.peersWantKey(key)) {
- for(int i=0;i<offeredKeys.length;i++)
- offeredKeys[i].remove(key);
- }
- if(transientCooldownQueue != null) {
- SendableGet cooldownGetter =
getter.getRequest(key, container);
- if(cooldownGetter != null)
- transientCooldownQueue.removeKey(key,
cooldownGetter, cooldownGetter.getCooldownWakeupByKey(key, null), null);
- }
- } else if(container != null) {
- // We are on the database thread already.
- schedCore.removePendingKey(getter, complain, key,
container);
- if(persistentCooldownQueue != null) {
- SendableGet cooldownGetter =
getter.getRequest(key, container);
- container.activate(cooldownGetter, 1);
- persistentCooldownQueue.removeKey(key,
cooldownGetter, cooldownGetter.getCooldownWakeupByKey(key, container),
container);
- container.deactivate(cooldownGetter, 1);
- }
- } else {
- jobRunner.queue(new DBJob() {
-
- public void run(ObjectContainer container,
ClientContext context) {
- container.activate(getter, 1);
- schedCore.removePendingKey(getter,
complain, key, container);
- if(persistentCooldownQueue != null) {
- SendableGet cooldownGetter =
getter.getRequest(key, container);
-
container.activate(cooldownGetter, 1);
-
persistentCooldownQueue.removeKey(key, cooldownGetter,
cooldownGetter.getCooldownWakeupByKey(key, container), container);
-
container.deactivate(cooldownGetter, 1);
- }
- }
-
- }, NativeThread.NORM_PRIORITY, false);
- }
+ /**
+ * Remove a SendableGet from the list of getters we maintain for each
key, indicating that we are no longer interested
+ * in that key.
+ * @param getter
+ * @param complain
+ */
+ public void removePendingKeys(KeyListener getter, boolean complain) {
+ boolean found = schedTransient.removePendingKeys(getter);
+ found |= schedCore.removePendingKeys(getter);
+ if(complain && !found)
+ Logger.error(this, "Listener not found when removing:
"+getter);
}
-
+
/**
* Remove a SendableGet from the list of getters we maintain for each
key, indicating that we are no longer interested
* in that key.
* @param getter
* @param complain
*/
- public void removePendingKeys(GotKeyListener getter, boolean complain) {
- ObjectContainer container;
- if(getter.persistent()) {
- container = selectorContainer;
- if(!databaseExecutor.onThread()) {
- throw new IllegalStateException("Not on
database thread!");
- }
- } else {
- container = null;
- }
- Key[] keys = getter.listKeys(container);
- for(int i=0;i<keys.length;i++) {
- removePendingKey(getter, complain, keys[i], container);
- }
+ public void removePendingKeys(HasKeyListener getter, boolean complain) {
+ boolean found = schedTransient.removePendingKeys(getter);
+ found |= schedCore.removePendingKeys(getter);
+ if(complain && !found)
+ Logger.error(this, "Listener not found when removing:
"+getter);
}
public void reregisterAll(final ClientRequester request,
ObjectContainer container) {
@@ -886,78 +745,23 @@
public void tripPendingKey(final KeyBlock block) {
if(logMINOR) Logger.minor(this,
"tripPendingKey("+block.getKey()+")");
- // First the transient stuff
-
if(offeredKeys != null) {
for(int i=0;i<offeredKeys.length;i++) {
offeredKeys[i].remove(block.getKey());
}
}
final Key key = block.getKey();
- final GotKeyListener[] transientGets =
schedTransient.removePendingKey(key, null);
- if(transientGets != null && transientGets.length > 0) {
- node.executor.execute(new Runnable() {
- public void run() {
- if(logMINOR) Logger.minor(this,
"Running "+transientGets.length+" callbacks off-thread for "+block.getKey());
- for(int i=0;i<transientGets.length;i++)
{
- try {
- if(logMINOR)
Logger.minor(this, "Calling tripPendingKey() callback for "+transientGets[i]+"
for "+key);
-
transientGets[i].onGotKey(key, block, null, clientContext);
- } catch (Throwable t) {
- Logger.error(this,
"Caught "+t+" running tripPendingKey() callback "+transientGets[i]+" for "+key,
t);
- }
- }
+ schedTransient.tripPendingKey(key, block, null, clientContext);
+ if(schedCore.anyProbablyWantKey(key, clientContext)) {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ if(logMINOR) Logger.minor(this,
"tripPendingKey for "+key);
+ schedCore.tripPendingKey(key, block,
container, clientContext);
}
- }, "Running off-thread callbacks for "+block.getKey());
- if(transientCooldownQueue != null) {
- for(int i=0;i<transientGets.length;i++) {
- GotKeyListener got = transientGets[i];
- SendableGet req = got.getRequest(key,
null);
- if(req == null) continue;
- transientCooldownQueue.removeKey(key,
req, req.getCooldownWakeupByKey(key, null), null);
- }
- }
+ }, TRIP_PENDING_PRIORITY, false);
}
- // Now the persistent stuff
-
- jobRunner.queue(new DBJob() {
-
- public void run(ObjectContainer container,
ClientContext context) {
- // FIXME is this necessary? the key is probably
non-persistent, no?
- container.activate(key, 5);
- if(logMINOR) Logger.minor(this, "tripPendingKey
for "+key);
- final GotKeyListener[] gets =
schedCore.removePendingKey(key, container);
- if(gets == null) return;
- if(persistentCooldownQueue != null) {
- for(int i=0;i<gets.length;i++) {
- GotKeyListener got = gets[i];
- container.activate(got, 1);
- SendableGet req =
got.getRequest(key, container);
- container.activate(req, 1);
- if(req == null) continue;
-
persistentCooldownQueue.removeKey(key, req, req.getCooldownWakeupByKey(key,
container), container);
- container.deactivate(req, 1);
- }
- }
- // Call the callbacks on the database executor
thread, because the first thing
- // they will need to do is access the database
to decide whether they need to
- // decode, and if so to find the key to decode
with.
- for(int i=0;i<gets.length;i++) {
- try {
- if(logMINOR) Logger.minor(this,
"Calling tripPendingKey() callback for "+gets[i]+" for "+key);
- container.activate(gets[i], 1);
- gets[i].onGotKey(key, block,
container, context);
- container.deactivate(gets[i],
1);
- } catch (Throwable t) {
- Logger.error(this, "Caught
"+t+" running tripPendingKey() callback "+gets[i]+" for "+key, t);
- }
- }
- if(logMINOR) Logger.minor(this, "Finished
running tripPendingKey() callbacks");
- }
-
- }, TRIP_PENDING_PRIORITY, false);
-
}
/** If we want the offered key, or if force is enabled, queue it */
@@ -969,7 +773,7 @@
// FIXME what priority???
priority =
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS;
}
- priority = schedTransient.getKeyPrio(key, priority, null);
+ priority = schedTransient.getKeyPrio(key, priority, null,
clientContext);
if(priority < Short.MAX_VALUE) {
offeredKeys[priority].queueKey(key);
starter.wakeUp();
@@ -981,7 +785,7 @@
public void run(ObjectContainer container,
ClientContext context) {
// Don't activate/deactivate the key, because
it's not persistent in the first place!!
- short priority = schedCore.getKeyPrio(key,
oldPrio, container);
+ short priority = schedCore.getKeyPrio(key,
oldPrio, container, context);
if(priority >= oldPrio) return; // already on
list at >= priority
offeredKeys[priority].queueKey(key.cloneKey());
starter.wakeUp();
@@ -1027,6 +831,18 @@
* Only go around once. We will be called again. If there are
keys to move, then RequestStarter will not
* sleep, because it will start them. Then it will come back
here. If we are off-thread i.e. on the database
* thread, then we will wake it up if we find keys... and we'll
be scheduled again.
+ *
+ * FIXME: I think we need to restore all the listeners for a
single key
+ * simultaneously to avoid some kind of race condition? Or
could we just
+ * restore the one request on the queue? Maybe it's just a
misguided
+ * optimisation? IIRC we had some severe problems when we
didn't have
+ * this, related to requests somehow being lost altogether...
Is it
+ * essential? We can save a query if it's not... Is this about
requests
+ * or about keys? Should we limit all requests across any
+ * SendableRequest's to 3 every half hour for a specific key?
Probably
+ * yes...? In which case, can the cooldown queue be entirely in
RAM,
+ * and would it be useful for it to be? Less disk, more RAM...
for fast
+ * nodes with little RAM it would be bad...
*/
final int MAX_KEYS = 20;
Key[] keys = queue.removeKeyBefore(now, container, MAX_KEYS);
@@ -1036,47 +852,20 @@
if(persistent)
container.activate(key, 5);
if(logMINOR) Logger.minor(this, "Restoring key: "+key);
- GotKeyListener[] gets =
schedCore.getClientsForPendingKey(key, container);
- GotKeyListener[] transientGets =
schedTransient.getClientsForPendingKey(key, null);
- if(gets == null && transientGets == null) {
+ SendableGet[] reqs = schedCore.requestsForKey(key,
container, clientContext);
+ SendableGet[] transientReqs =
schedTransient.requestsForKey(key, container, clientContext);
+ if(reqs == null && transientReqs == null) {
// Not an error as this can happen due to race
conditions etc.
if(logMINOR) Logger.minor(this, "Restoring key
but no keys queued?? for "+key);
- continue;
- } else {
- if(gets != null) {
- if(logMINOR) Logger.minor(this,
"Restoring keys for persistent jobs...");
- for(int i=0;i<gets.length;i++) {
- if(persistent)
-
container.activate(gets[i], 1);
- GotKeyListener got = gets[i];
- SendableGet req =
got.getRequest(key, container);
- if(persistent)
- container.activate(req,
1);
- if(req == null) {
- Logger.error(this, "No
request for listener "+got+" while requeueing "+key);
- } else {
-
req.requeueAfterCooldown(key, now, container, clientContext);
- }
- if(persistent) {
-
container.deactivate(gets[i], 1);
-
container.deactivate(req, 1);
- }
- }
- }
- if(transientGets != null) {
- if(transientGets != null) {
- if(logMINOR) Logger.minor(this,
"Restoring keys for transient jobs...");
- for(int
i=0;i<transientGets.length;i++) {
- GotKeyListener got =
transientGets[i];
- SendableGet req =
got.getRequest(key, null);
- if(req == null) {
-
Logger.error(this, "No request for listener "+got+" while requeueing "+key);
- }
-
req.requeueAfterCooldown(key, now, container, clientContext);
- }
- }
- }
}
+ if(reqs != null) {
+ for(int i=0;i<reqs.length;i++)
+ reqs[i].requeueAfterCooldown(key, now,
container, clientContext);
+ }
+ if(transientReqs != null) {
+ for(int i=0;i<reqs.length;i++)
+
transientReqs[i].requeueAfterCooldown(key, now, container, clientContext);
+ }
if(persistent)
container.deactivate(key, 5);
}
@@ -1167,5 +956,13 @@
schedTransient.removeFromAllRequestsByClientRequest(get, clientRequest,
dontComplain, null);
}
+ public byte[] saltKey(Key key) {
+ MessageDigest md = SHA256.getMessageDigest();
+ md.update(key.getRoutingKey());
+ md.update(schedCore.globalSalt);
+ byte[] ret = md.digest();
+ SHA256.returnMessageDigest(md);
+ return ret;
+ }
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-08-19 23:20:05 UTC (rev 22045)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -3,6 +3,9 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -11,9 +14,12 @@
import freenet.crypt.RandomSource;
import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.keys.NodeSSK;
import freenet.node.BaseSendableGet;
import freenet.node.RequestScheduler;
import freenet.node.RequestStarter;
+import freenet.node.SendableGet;
import freenet.node.SendableInsert;
import freenet.node.SendableRequest;
import freenet.support.Logger;
@@ -53,6 +59,8 @@
protected final Map allRequestsByClientRequest;
protected final List /* <BaseSendableGet> */ recentSuccesses;
protected transient ClientRequestScheduler sched;
+ /** Transient even for persistent scheduler. */
+ protected transient final Set<KeyListener> keyListeners;
abstract boolean persistent();
@@ -61,6 +69,7 @@
this.isSSKScheduler = forSSKs;
this.allRequestsByClientRequest = allRequestsByClientRequest;
this.recentSuccesses = recentSuccesses;
+ keyListeners = new HashSet<KeyListener>();
priorities = new
SortedVectorByNumber[RequestStarter.NUMBER_OF_PRIORITY_CLASSES];
logMINOR = Logger.shouldLog(Logger.MINOR,
ClientRequestSchedulerBase.class);
}
@@ -203,54 +212,115 @@
}
}
- /**
- * Keys must already be activated.
- * @param getter
- * @param keyTokens
- * @param container
- */
- public void addPendingKeys(GotKeyListener getter, Key[] keyTokens,
ObjectContainer container) {
- if(persistent())
- container.activate(getter, 1);
- Key prevTok = null;
- for(int i=0;i<keyTokens.length;i++) {
- Key key = keyTokens[i];
- if(i != 0 && (prevTok == key || (prevTok != null && key
!= null && prevTok.equals(key)))) {
- Logger.error(this, "Ignoring duplicate token");
- continue;
+ public synchronized void addPendingKeys(KeyListener listener) {
+ keyListeners.add(listener);
+ }
+
+ public synchronized boolean removePendingKeys(KeyListener listener) {
+ boolean ret = keyListeners.remove(listener);
+ listener.onRemove();
+ return ret;
+ }
+
+ public synchronized boolean removePendingKeys(HasKeyListener
hasListener) {
+ boolean found = false;
+ for(Iterator<KeyListener> i =
keyListeners.iterator();i.hasNext();) {
+ KeyListener listener = i.next();
+ if(listener.getHasKeyListener() == hasListener) {
+ found = true;
+ i.remove();
+ listener.onRemove();
}
- addPendingKey(key, getter, container);
}
+ return found;
}
- public short getKeyPrio(Key key, short priority, ObjectContainer
container) {
- GotKeyListener[] getters = getClientsForPendingKey(key,
container);
- if(getters == null) return priority;
- for(int i=0;i<getters.length;i++) {
- if(persistent())
- container.activate(getters[i], 1);
- short prio = getters[i].getPriorityClass(container);
+ public short getKeyPrio(Key key, short priority, ObjectContainer
container, ClientContext context) {
+ byte[] saltedKey = ((key instanceof NodeSSK) ?
context.getSskFetchScheduler() : context.getChkFetchScheduler()).saltKey(key);
+ ArrayList<KeyListener> matches = null;
+ synchronized(this) {
+ for(KeyListener listener : keyListeners) {
+ if(!listener.probablyWantKey(key, saltedKey))
continue;
+ if(matches == null) matches = new
ArrayList<KeyListener> ();
+ matches.add(listener);
+ }
+ }
+ if(matches == null) return priority;
+ for(KeyListener listener : matches) {
+ short prio = listener.definitelyWantKey(key, saltedKey,
container, sched.clientContext);
+ if(prio == -1) continue;
if(prio < priority) priority = prio;
- if(persistent())
- container.deactivate(getters[i], 1);
}
return priority;
}
- public abstract long countQueuedRequests(ObjectContainer container);
+ public synchronized long countQueuedRequests(ObjectContainer container)
{
+ long count = 0;
+ for(KeyListener listener : keyListeners)
+ count += listener.countKeys();
+ return count;
+ }
- protected abstract boolean inPendingKeys(GotKeyListener req, Key key,
ObjectContainer container);
+ public boolean anyWantKey(Key key, ObjectContainer container,
ClientContext context) {
+ byte[] saltedKey = ((key instanceof NodeSSK) ?
context.getSskFetchScheduler() : context.getChkFetchScheduler()).saltKey(key);
+ ArrayList<KeyListener> matches = null;
+ synchronized(this) {
+ for(KeyListener listener : keyListeners) {
+ if(!listener.probablyWantKey(key, saltedKey))
continue;
+ if(matches == null) matches = new
ArrayList<KeyListener> ();
+ matches.add(listener);
+ }
+ }
+ if(matches != null) {
+ for(KeyListener listener : matches) {
+ if(listener.definitelyWantKey(key, saltedKey,
container, sched.clientContext) >= 0)
+ return true;
+ }
+ }
+ return false;
+ }
- public abstract GotKeyListener[] getClientsForPendingKey(Key key,
ObjectContainer container);
+ public synchronized boolean anyProbablyWantKey(Key key, ClientContext
context) {
+ byte[] saltedKey = ((key instanceof NodeSSK) ?
context.getSskFetchScheduler() : context.getChkFetchScheduler()).saltKey(key);
+ for(KeyListener listener : keyListeners) {
+ if(listener.probablyWantKey(key, saltedKey))
+ return true;
+ }
+ return false;
+ }
- public abstract boolean anyWantKey(Key key, ObjectContainer container);
+ public void tripPendingKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context) {
+ byte[] saltedKey = ((key instanceof NodeSSK) ?
context.getSskFetchScheduler() : context.getChkFetchScheduler()).saltKey(key);
+ ArrayList<KeyListener> matches = null;
+ synchronized(this) {
+ for(KeyListener listener : keyListeners) {
+ if(!listener.probablyWantKey(key, saltedKey))
continue;
+ if(matches == null) matches = new
ArrayList<KeyListener> ();
+ matches.add(listener);
+ }
+ }
+ if(matches != null) {
+ for(KeyListener listener : matches)
+ listener.handleBlock(key, saltedKey, block,
container, context);
+ }
+ }
+
+ public SendableGet[] requestsForKey(Key key, ObjectContainer container,
ClientContext context) {
+ ArrayList<SendableGet> list = null;
+ byte[] saltedKey = ((key instanceof NodeSSK) ?
context.getSskFetchScheduler() : context.getChkFetchScheduler()).saltKey(key);
+ synchronized(this) {
+ for(KeyListener listener : keyListeners) {
+ if(!listener.probablyWantKey(key, saltedKey)) continue;
+ SendableGet[] reqs = listener.getRequestsForKey(key,
saltedKey, container, context);
+ if(reqs == null) continue;
+ if(list != null) list = new ArrayList<SendableGet>();
+ for(int i=0;i<reqs.length;i++) list.add(reqs[i]);
+ }
+ }
+ if(list == null) return null;
+ else return list.toArray(new SendableGet[list.size()]);
+ }
- public abstract GotKeyListener[] removePendingKey(Key key,
ObjectContainer container);
-
- public abstract boolean removePendingKey(GotKeyListener getter, boolean
complain, Key key, ObjectContainer container);
-
- abstract void addPendingKey(Key key, GotKeyListener getter,
ObjectContainer container);
-
protected abstract Set
makeSetForAllRequestsByClientRequest(ObjectContainer container);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-08-19 23:20:05 UTC (rev 22045)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -20,6 +20,7 @@
import freenet.crypt.RandomSource;
import freenet.keys.ClientKey;
import freenet.keys.Key;
+import freenet.keys.KeyBlock;
import freenet.node.BaseSendableGet;
import freenet.node.KeysFetchingLocally;
import freenet.node.Node;
@@ -61,6 +62,8 @@
*/
private transient HashSet keysFetching;
+ public final byte[] globalSalt;
+
/**
* Fetch a ClientRequestSchedulerCore from the database, or create a
new one.
* @param node
@@ -106,7 +109,8 @@
} else {
this.persistentCooldownQueue = null;
}
-
+ globalSalt = new byte[32];
+ node.random.nextBytes(globalSalt);
}
private void onStarted(ObjectContainer container, long cooldownTime,
ClientRequestScheduler sched, ClientContext context) {
@@ -200,8 +204,24 @@
};
registerMeRunner = new RegisterMeRunner();
+ loadKeyListeners(container, context);
}
+ private void loadKeyListeners(final ObjectContainer container,
ClientContext context) {
+ ObjectSet<HasKeyListener> results =
+ container.query(HasKeyListener.class);
+ for(HasKeyListener l : results) {
+ try {
+ if(l.isCancelled(container)) continue;
+ addPendingKeys(l.makeKeyListener(container,
context));
+ } catch (KeyListenerConstructionException e) {
+ System.err.println("FAILED TO LOAD REQUEST
BLOOM FILTERS:");
+ e.printStackTrace();
+ Logger.error(this, "FAILED TO LOAD REQUEST
BLOOM FILTERS: "+e, e);
+ }
+ }
+ }
+
private transient DBJob preRegisterMeRunner;
void start(DBJobRunner runner) {
@@ -614,45 +634,10 @@
long endNext = System.currentTimeMillis();
if(logMINOR)
Logger.minor(this, "RegisterMe: next()
took "+(endNext-startNext));
- if(reg.getters != null) {
- boolean allKilled = true;
- for(int j=0;j<reg.getters.length;j++) {
-
container.activate(reg.getters[j], 2);
-
if(!reg.getters[j].isCancelled(container))
- allKilled = false;
- }
- if(reg.listener != null) {
-
if(!reg.listener.isCancelled(container))
- allKilled = false;
- }
- if(allKilled) {
- if(logMINOR)
- Logger.minor(this, "Not
registering as all SendableGet's already cancelled");
- continue;
- }
- }
if(logMINOR)
- Logger.minor(this, "Running RegisterMe
"+reg+" for "+reg.listener+" and "+reg.getters+" and "+reg.nonGetRequest+" :
"+reg.key.addedTime+" : "+reg.key.priority);
+ Logger.minor(this, "Running RegisterMe
"+reg+" for "+reg.nonGetRequest+" : "+reg.key.addedTime+" : "+reg.key.priority);
// Don't need to activate, fields should exist?
FIXME
- if(reg.listener != null || reg.getters != null)
{
- try {
- sched.register(reg.listener,
reg.getters, false, true, true, reg.blocks, reg);
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+"
running RegisterMeRunner", t);
- // Cancel the request, and commit so it
isn't tried again.
- if(reg.getters != null) {
- for(int
k=0;k<reg.getters.length;k++)
-
reg.getters[k].internalError(t, sched, container, context, true);
- }
- }
- if(reg.listener != null)
- container.deactivate(reg.listener, 1);
- if(reg.getters != null) {
- for(int j=0;j<reg.getters.length;j++)
-
container.deactivate(reg.getters[j], 1);
- }
- }
if(reg.nonGetRequest != null) {
container.activate(reg.nonGetRequest,
1);
if(reg.nonGetRequest.isCancelled(container)) {
@@ -725,172 +710,6 @@
return new Db4oSet(container, 1);
}
- private ObjectSet queryForKey(final Key key, ObjectContainer container)
{
- final String pks = HexUtil.bytesToHex(key.getFullKey());
- long startTime = System.currentTimeMillis();
- // Can db4o handle this???
- // Apparently not. Diagnostics say it's not optimised. Which is
annoying,
- // since it can quite clearly be turned into 2 simple
constraints and
- // one evaluation... :(
- // FIXME maybe db4o 7.2 can handle this???
-// ObjectSet ret = container.query(new Predicate() {
-// public boolean match(PendingKeyItem item) {
-// if(!pks.equals(item.fullKeyAsBytes)) return
false;
-// if(item.nodeDBHandle != nodeDBHandle) return
false;
-// if(!key.equals(item.key)) return false;
-// return true;
-// }
-// });
- Query query = container.query();
- query.constrain(PendingKeyItem.class);
-
query.descend("fullKeyAsBytes").constrain(pks).and(query.descend("nodeDBHandle").constrain(new
Long(nodeDBHandle)));
- Evaluation eval = new Evaluation() {
-
- public void evaluate(Candidate candidate) {
- PendingKeyItem item = (PendingKeyItem)
candidate.getObject();
- Key k = item.key;
- candidate.objectContainer().activate(k, 5);
- if(k.equals(key))
- candidate.include(true);
- else {
- candidate.include(false);
- }
- }
-
- };
- query.constrain(eval);
- ObjectSet ret = query.execute();
- long endTime = System.currentTimeMillis();
- if(endTime - startTime > 1000)
- Logger.error(this, "Query took "+(endTime -
startTime)+"ms for "+((key instanceof freenet.keys.NodeSSK) ? "SSK" : "CHK"));
- else if(logMINOR)
- Logger.minor(this, "Query took "+(endTime -
startTime)+"ms for "+((key instanceof freenet.keys.NodeSSK) ? "SSK" : "CHK"));
- return ret;
- }
-
- public long countQueuedRequests(ObjectContainer container) {
-// ObjectSet pending = container.query(new Predicate() {
-// public boolean match(PendingKeyItem item) {
-// if(item.nodeDBHandle == nodeDBHandle) return
true;
-// return false;
-// }
-// });
-// return pending.size();
- // If we just ask for the set of all PendingKeyItem's, we can
- // filter them manually, and the query doesn't need to allocate
any
- // significant amount of RAM - it just remembers to return the
class
- // index.
- ObjectSet pending = container.query(PendingKeyItem.class);
- long total = 0;
- while(pending.hasNext()) {
- PendingKeyItem item = (PendingKeyItem) pending.next();
- if(item.nodeDBHandle != nodeDBHandle) {
- container.deactivate(item, 1);
- continue;
- }
- container.deactivate(item, 1);
- total++;
- }
- return total;
- }
-
- protected boolean inPendingKeys(GotKeyListener req, final Key key,
ObjectContainer container) {
- ObjectSet pending = queryForKey(key, container);
- if(pending.hasNext()) {
- PendingKeyItem item = (PendingKeyItem) pending.next();
- return item.hasGetter(req);
- }
- Logger.error(this, "Key not in pendingKeys at all");
-// Key copy = key.cloneKey();
-// addPendingKey(copy, req, container);
-// container.commit();
-// pending = container.query(new Predicate() {
-// public boolean match(PendingKeyItem item) {
-// if(!key.equals(item.key)) return false;
-// if(item.nodeDBHandle != nodeDBHandle) return
false;
-// return true;
-// }
-// });
-// if(!pending.hasNext()) {
-// Logger.error(this, "INDEXES BROKEN!!!");
-// } else {
-// PendingKeyItem item = (PendingKeyItem) (pending.next());
-// Key k = item.key;
-// container.delete(item);
-// Logger.error(this, "Indexes work");
-// }
- return false;
- }
-
- public GotKeyListener[] getClientsForPendingKey(final Key key,
ObjectContainer container) {
- ObjectSet pending = queryForKey(key, container);
- if(pending.hasNext()) {
- PendingKeyItem item = (PendingKeyItem) pending.next();
- return item.getters();
- }
- return null;
- }
-
- public boolean anyWantKey(final Key key, ObjectContainer container) {
- ObjectSet pending = queryForKey(key, container);
- return pending.hasNext();
- }
-
- public GotKeyListener[] removePendingKey(final Key key, ObjectContainer
container) {
- ObjectSet pending = queryForKey(key, container);
- if(pending.hasNext()) {
- PendingKeyItem item = (PendingKeyItem) pending.next();
- GotKeyListener[] getters = item.getters();
- container.delete(item);
- return getters;
- }
- return null;
- }
-
- public boolean removePendingKey(GotKeyListener getter, boolean
complain, final Key key, ObjectContainer container) {
- ObjectSet pending = queryForKey(key, container);
- if(pending.hasNext()) {
- PendingKeyItem item = (PendingKeyItem) pending.next();
- boolean ret = item.removeGetter(getter);
- if(item.isEmpty()) {
- container.delete(item);
- } else {
- container.set(item);
- }
- return ret;
- }
- return false;
- }
-
- protected void addPendingKey(final Key key, GotKeyListener getter,
ObjectContainer container) {
- if(logMINOR)
- Logger.minor(this, "Adding pending key for "+key+" for
"+getter);
- long startTime = System.currentTimeMillis();
-// Query query = container.query();
-// query.constrain(PendingKeyItem.class);
-// query.descend("key").constrain(key);
-// query.descend("nodeDBHandle").constrain(new Long(nodeDBHandle));
-// ObjectSet pending = query.execute();
-
- // Native version seems to be faster, at least for a few
thousand items...
- // I'm not sure whether it's using the index though, we may
need to reconsider for larger queues... FIXME
-
- ObjectSet pending = queryForKey(key, container);
- long endTime = System.currentTimeMillis();
- if(endTime - startTime > 1000)
- Logger.error(this, "Query took "+(endTime -
startTime)+"ms for "+((key instanceof freenet.keys.NodeSSK) ? "SSK" : "CHK"));
- else if(logMINOR)
- Logger.minor(this, "Query took "+(endTime -
startTime)+"ms for "+((key instanceof freenet.keys.NodeSSK) ? "SSK" : "CHK"));
- if(pending.hasNext()) {
- PendingKeyItem item = (PendingKeyItem) pending.next();
- item.addGetter(getter);
- container.set(item);
- } else {
- PendingKeyItem item = new PendingKeyItem(key, getter,
nodeDBHandle);
- container.set(item);
- }
- }
-
public void rerunRegisterMeRunner(DBJobRunner runner) {
synchronized(this) {
shouldReRunRegisterMeRunner = true;
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
2008-08-19 23:20:05 UTC (rev 22045)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -59,181 +59,6 @@
return new HashSet();
}
- /**
- * Register a pending key to an already-registered request. This is
necessary if we've
- * already registered a SendableGet, but we later add some more keys to
it.
- */
- void addPendingKey(Key nodeKey, GotKeyListener getter, ObjectContainer
container) {
- logMINOR = Logger.shouldLog(Logger.MINOR,
ClientRequestSchedulerBase.class);
- if(logMINOR)
- Logger.minor(this, "Adding pending key "+nodeKey+" for
"+getter);
- synchronized(pendingKeys) {
- Object o = pendingKeys.get(nodeKey);
- if(o == null) {
- pendingKeys.put(nodeKey, getter);
- } else if(o instanceof GotKeyListener) {
- GotKeyListener oldGet = (GotKeyListener) o;
- if(oldGet != getter) {
- pendingKeys.put(nodeKey, new
GotKeyListener[] { oldGet, getter });
- }
- } else {
- GotKeyListener[] gets = (GotKeyListener[]) o;
- boolean found = false;
- for(int j=0;j<gets.length;j++) {
- if(gets[j] == getter) {
- found = true;
- break;
- }
- }
- if(!found) {
- GotKeyListener[] newGets = new
GotKeyListener[gets.length+1];
- System.arraycopy(gets, 0, newGets, 0,
gets.length);
- newGets[gets.length] = getter;
- pendingKeys.put(nodeKey, newGets);
- }
- }
- }
- }
-
- public boolean removePendingKey(GotKeyListener getter, boolean
complain, Key key, ObjectContainer container) {
- if(logMINOR)
- Logger.minor(this, "Removing pending key: "+getter+"
for "+key);
- boolean dropped = false;
- Object o;
- synchronized(pendingKeys) {
- o = pendingKeys.get(key);
- if(o == null) {
- if(complain)
- Logger.normal(this, "Not found:
"+getter+" for "+key+" removing (no such key)", new Exception("debug"));
- } else if(o instanceof GotKeyListener) {
- GotKeyListener oldGet = (GotKeyListener) o;
- if(oldGet != getter) {
- if(complain)
- Logger.normal(this, "Not found:
"+getter+" for "+key+" removing (1 getter)", new Exception("debug"));
- } else {
- dropped = true;
- pendingKeys.remove(key);
- if(logMINOR)
- Logger.minor(this, "Removed
only getter (1) for "+key, new Exception("debug"));
- }
- } else {
- GotKeyListener[] gets = (GotKeyListener[]) o;
- final int getsLength = gets.length;
- GotKeyListener[] newGets = new
GotKeyListener[getsLength > 1 ? getsLength-1 : 0];
- boolean found = false;
- int x = 0;
- for(int j=0;j<getsLength;j++) {
- if(gets[j] == getter) {
- found = true;
- dropped = true;
- continue;
- }
- if(x == newGets.length) {
- if(!found) {
- if(complain)
-
Logger.normal(this, "Not found: "+getter+" for "+key+" removing ("+getsLength+"
getters)");
- return false; // not
here
- } // else is a contradiction,
let it get an ArrayIndexOutOfBounds.
- }
- if(gets[j] == null) continue;
- if(gets[j].isCancelled(container))
continue;
- newGets[x++] = gets[j];
- }
- if(x == 0) {
- pendingKeys.remove(key);
- if(logMINOR)
- Logger.minor(this, "Removed
only getter (2) for "+key, new Exception("debug"));
- } else if(x == 1) {
- pendingKeys.put(key, newGets[0]);
- } else {
- if(x != getsLength-1) {
- GotKeyListener[] newNewGets =
new GotKeyListener[x];
- System.arraycopy(newGets, 0,
newNewGets, 0, x);
- newGets = newNewGets;
- }
- pendingKeys.put(key, newGets);
- }
- }
- }
- return dropped;
- }
-
- public GotKeyListener[] removePendingKey(Key key, ObjectContainer
container) {
- Object o;
- final GotKeyListener[] gets;
- synchronized(pendingKeys) {
- o = pendingKeys.remove(key);
- }
- if(o == null) return null;
- if(o instanceof GotKeyListener) {
- gets = new GotKeyListener[] { (GotKeyListener) o };
- if(logMINOR)
- Logger.minor(this, "Removing all pending keys
for "+key+" (1)", new Exception("debug"));
- } else {
- gets = (GotKeyListener[]) o;
- if(logMINOR)
- Logger.minor(this, "Removing all pending keys
for "+key+" ("+gets.length+")", new Exception("debug"));
- }
- return gets;
- }
-
- public boolean anyWantKey(Key key, ObjectContainer container) {
- synchronized(pendingKeys) {
- return pendingKeys.get(key) != null;
- }
- }
-
- public short getKeyPrio(Key key, short priority, ObjectContainer
container) {
- synchronized(pendingKeys) {
- Object o = pendingKeys.get(key);
- if(o == null) {
- // Blah
- } else if(o instanceof GotKeyListener) {
- short p =
((GotKeyListener)o).getPriorityClass(container);
- if(p < priority) priority = p;
- } else { // if(o instanceof SendableGet[]) {
- GotKeyListener[] gets = (GotKeyListener[]) o;
- for(int i=0;i<gets.length;i++) {
- short p =
gets[i].getPriorityClass(container);
- if(p < priority) priority = p;
- }
- }
- }
- return priority;
- }
-
- public GotKeyListener[] getClientsForPendingKey(Key key,
ObjectContainer container) {
- Object o;
- synchronized(pendingKeys) {
- o = pendingKeys.get(key);
- }
- if(o == null) {
- return null;
- } else if(o instanceof GotKeyListener) {
- GotKeyListener get = (GotKeyListener) o;
- return new GotKeyListener[] { get };
- } else {
- return (GotKeyListener[]) o;
- }
- }
-
- protected boolean inPendingKeys(GotKeyListener req, Key key,
ObjectContainer container) {
- Object o;
- synchronized(pendingKeys) {
- o = pendingKeys.get(key);
- }
- if(o == null) {
- return false;
- } else if(o instanceof GotKeyListener) {
- return o == req;
- } else {
- GotKeyListener[] gets = (GotKeyListener[]) o;
- for(int i=0;i<gets.length;i++)
- if(gets[i] == req) return true;
- }
- return false;
- }
-
public long countQueuedRequests(ObjectContainer container) {
if(pendingKeys != null)
return pendingKeys.size();
Deleted: branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -1,58 +0,0 @@
-package freenet.client.async;
-
-import com.db4o.ObjectContainer;
-
-import freenet.keys.Key;
-import freenet.keys.KeyBlock;
-import freenet.node.SendableGet;
-
-public interface GotKeyListener {
-
- /**
- * Callback for when a block is found. Will be called on the database
executor thread.
- * @param key
- * @param block
- * @param sched
- */
- public abstract void onGotKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context);
-
- /**
- * What keys are we interested in?
- * @param container Database handle.
- */
- Key[] listKeys(ObjectContainer container);
-
- /**
- * Is this related to a persistent request?
- */
- boolean persistent();
-
- /**
- * Priority of the associated request.
- * @param container Database handle.
- */
- short getPriorityClass(ObjectContainer container);
-
- /**
- * Is the request cancelled/finished/invalid?
- * @param container Database handle.
- */
- boolean isCancelled(ObjectContainer container);
-
- /**
- * Get the SendableGet for a specific key, if any.
- * Used in requeueing requests after a cooldown has expired.
- * @param key The key.
- * @param container The database handle.
- * @return Null if we don't want to register a request for the key,
- * otherwise the SendableGet.
- */
- public abstract SendableGet getRequest(Key key, ObjectContainer
container);
-
- /**
- * @return True if when checking the datastore on initial registration,
we
- * should not promote any blocks found.
- */
- public abstract boolean dontCache(ObjectContainer container);
-
-}
Deleted: branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -1,82 +0,0 @@
-package freenet.client.async;
-
-import com.db4o.ObjectContainer;
-
-import freenet.keys.Key;
-import freenet.support.HexUtil;
-
-public class PendingKeyItem {
-
- final long nodeDBHandle;
- final Key key;
- /**
- * EVIL DB4O HACK:
- * Db4o does not support indexing objects with a Comparator. It will
only
- * index by the object id. It will not index by a byte[]. But it WILL
index
- * by a string quite happily and very fast. So we convert to a string
here.
- * Not doing so results in db4o instantiating every key in order to
compare
- * it... whereas doing so results in a fast index lookup.
- */
- final String fullKeyAsBytes;
- private GotKeyListener[] getters;
-
- PendingKeyItem(Key key, GotKeyListener getter, long nodeDBHandle) {
- this.key = key;
- this.getters = new GotKeyListener[] { getter };
- this.nodeDBHandle = nodeDBHandle;
- this.fullKeyAsBytes = HexUtil.bytesToHex(key.getFullKey());
- }
-
- public void addGetter(GotKeyListener getter) {
- for(int i=0;i<getters.length;i++) {
- if(getters[i] == getter) return;
- }
- GotKeyListener[] newGetters = new
GotKeyListener[getters.length+1];
- System.arraycopy(getters, 0, newGetters, 0, getters.length);
- newGetters[getters.length] = getter;
- getters = newGetters;
- }
-
- /**
- * @param getter
- * @return True if the getter was removed. Caller should check
isEmpty() afterwards.
- */
- public boolean removeGetter(GotKeyListener getter) {
- int found = 0;
- for(int i=0;i<getters.length;i++) {
- if(getters[i] == getter) found++;
- }
- if(found == 0) return false;
- if(found == getters.length)
- getters = new GotKeyListener[0];
- else {
- GotKeyListener[] newGetters = new
GotKeyListener[getters.length - found];
- int x = 0;
- for(int i=0;i<getters.length;i++) {
- if(getters[i] == getter) continue;
- newGetters[x++] = getters[i];
- }
- getters = newGetters;
- }
- return true;
- }
-
- public boolean isEmpty() {
- return getters.length == 0;
- }
-
- public boolean hasGetter(GotKeyListener req) {
- for(int i=0;i<getters.length;i++)
- if(getters[i] == req) return true;
- return false;
- }
-
- public GotKeyListener[] getters() {
- return getters;
- }
-
- public void objectOnActivate(ObjectContainer container) {
- container.activate(key, 5);
- }
-
-}
\ No newline at end of file
Modified: branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -2,17 +2,14 @@
import com.db4o.ObjectContainer;
-import freenet.node.SendableGet;
import freenet.node.SendableRequest;
/**
* These must be deleted once the request has been registered.
- * See PersistentChosenRequest.
+ * See DatastoreCheckerItem: this class only handles inserts.
* @author toad
*/
public class RegisterMe {
- final GotKeyListener listener;
- final SendableGet[] getters;
final SendableRequest nonGetRequest;
final ClientRequestSchedulerCore core;
final RegisterMeSortKey key;
@@ -23,21 +20,13 @@
private final int hashCode;
public final BlockSet blocks;
- RegisterMe(GotKeyListener listener, SendableGet[] getters,
SendableRequest nonGetRequest, short prio, ClientRequestSchedulerCore core,
BlockSet blocks, long bootID) {
- this.listener = listener;
+ RegisterMe(SendableRequest nonGetRequest, short prio,
ClientRequestSchedulerCore core, BlockSet blocks, long bootID) {
this.bootID = bootID;
- this.getters = getters;
this.core = core;
this.nonGetRequest = nonGetRequest;
this.key = new RegisterMeSortKey(prio);
this.blocks = blocks;
int hash = core.hashCode();
- if(listener != null)
- hash ^= listener.hashCode();
- if(getters != null) {
- for(int i=0;i<getters.length;i++)
- hash ^= getters[i].hashCode();
- }
if(nonGetRequest != null)
hash ^= nonGetRequest.hashCode();
hash *= prio;
Modified:
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -166,4 +166,8 @@
return token;
}
+ public void onFailed(KeyListenerConstructionException e,
ObjectContainer container, ClientContext context) {
+ onFailure(e.getFetchException(), false, container, context);
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -520,11 +520,11 @@
f.addDecompressor(codec);
}
parent.onTransition(this, f, container);
- f.schedule(container, context, false);
if(persistent) {
container.set(metaStrings);
- container.set(this);
+ container.set(this); // Store *before*
scheduling to avoid activation problems.
}
+ f.schedule(container, context);
// All done! No longer our problem!
metadata = null; // Get rid just in case we
stick around somehow.
return;
@@ -605,7 +605,12 @@
SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, parent, ctx,
decompressors, clientMetadata,
actx, recursionLevel, returnBucket, token, container, context);
parent.onTransition(this, sf, container);
- sf.schedule(container, context, false);
+ try {
+ sf.schedule(container, context);
+ } catch (KeyListenerConstructionException e) {
+ onFailure(e.getFetchException(), false,
container, context);
+ return;
+ }
if(persistent) container.deactivate(sf, 1);
rcb.onBlockSetFinished(this, container,
context);
// Clear our own metadata, we won't need it any
more.
@@ -936,7 +941,7 @@
if(l == usk.suggestedEdition) {
SingleFileFetcher sf = new
SingleFileFetcher(parent, cb, clientMetadata, key, metaStrings,
key.getURI().addMetaStrings(metaStrings),
0, ctx, actx, null,
null, maxRetries, recursionLevel+1, dontTellClientGet, token, false,
returnBucket, true, container, context);
- sf.schedule(container, context, false);
+ sf.schedule(container, context);
} else {
cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
newUSK.getURI().addMetaStrings(metaStrings)), null, container, context);
}
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -3,6 +3,7 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -19,6 +20,8 @@
import freenet.keys.CHKBlock;
import freenet.keys.ClientCHK;
import freenet.keys.NodeCHK;
+import freenet.node.SendableGet;
+import freenet.support.BloomFilter;
import freenet.support.Fields;
import freenet.support.Logger;
import freenet.support.OOMHandler;
@@ -30,7 +33,7 @@
* Fetch a splitfile, decompress it if need be, and return it to the
GetCompletionCallback.
* Most of the work is done by the segments, and we do not need a thread.
*/
-public class SplitFileFetcher implements ClientGetState {
+public class SplitFileFetcher implements ClientGetState, HasKeyListener {
final FetchContext fetchContext;
final ArchiveContext archiveContext;
@@ -60,6 +63,7 @@
private boolean finished;
private long token;
final boolean persistent;
+ private FetchException otherFailure;
// A persistent hashCode is helpful in debugging, and also means we can
put
// these objects into sets etc when we need to.
@@ -70,6 +74,42 @@
return hashCode;
}
+ // Bloom filter stuff
+ /** The main bloom filter, which includes every key in the segment, is
stored
+ * in this file. It is a counting filter and is updated when a key is
found. */
+ File mainBloomFile;
+ /** The per-segment bloom filters are kept in this (slightly larger)
file,
+ * appended one after the next. */
+ File altBloomFile;
+ /** Size of the main Bloom filter in bytes. */
+ final int mainBloomFilterSizeBytes;
+ /** Default mainBloomElementsPerKey. False positives is approx
+ * 0.6185^[this number], so 19 gives us 0.01% false positives, which
should
+ * be acceptable even if there are thousands of splitfiles on the
queue. */
+ static final int DEFAULT_MAIN_BLOOM_ELEMENTS_PER_KEY = 19;
+ /** Number of hashes for the main filter. */
+ final int mainBloomK;
+ /** What proportion of false positives is acceptable for the per-segment
+ * Bloom filters? This is divided by the number of segments, so it is
(roughly)
+ * an overall probability of any false positive given that we reach the
+ * per-segment filters. IMHO 1 in 100 is adequate. */
+ static final double ACCEPTABLE_BLOOM_FALSE_POSITIVES_ALL_SEGMENTS =
0.01;
+ /** Size of per-segment bloom filter in bytes. This is calculated from
the
+ * above constant and the number of segments, and rounded up. */
+ final int perSegmentBloomFilterSizeBytes;
+ /** Number of hashes for the per-segment bloom filters. */
+ final int perSegmentK;
+ private int keyCount;
+ /** Salt used in the secondary Bloom filters if the primary matches.
+ * The primary Bloom filters use the already-salted saltedKey. */
+ private final byte[] localSalt;
+ /** Reference set on the first call to makeKeyListener().
+ * NOTE: db4o DOES NOT clear transient variables on deactivation.
+ * So as long as this is paged in (i.e. there is a reference to it,
i.e. the
+ * KeyListener), it will remain valid, once it is set by the first call
+ * during resuming. */
+ private transient SplitFileFetcherKeyListener tempListener;
+
public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb,
ClientRequester parent2,
FetchContext newCtx, ArrayList decompressors2,
ClientMetadata clientMetadata,
ArchiveContext actx, int recursionLevel, Bucket
returnBucket, long token2, ObjectContainer container, ClientContext context)
throws FetchException, MetadataParseException {
@@ -84,6 +124,8 @@
this.cb = rcb;
this.recursionLevel = recursionLevel + 1;
this.parent = parent2;
+ localSalt = new byte[32];
+ context.random.nextBytes(localSalt);
if(parent2.isCancelled())
throw new FetchException(FetchException.CANCELLED);
overrideLength = metadata.dataLength();
@@ -160,7 +202,7 @@
if(splitfileCheckBlocks.length > 0)
System.arraycopy(splitfileCheckBlocks, 0,
newSplitfileCheckBlocks, 0, splitfileCheckBlocks.length);
segments[0] = new
SplitFileFetcherSegment(splitfileType, newSplitfileDataBlocks,
newSplitfileCheckBlocks,
- this, archiveContext, fetchContext,
maxTempLength, recursionLevel, parent);
+ this, archiveContext, fetchContext,
maxTempLength, recursionLevel, parent, 0);
if(persistent) {
container.set(segments[0]);
segments[0].deactivateKeys(container);
@@ -182,7 +224,7 @@
dataBlocksPtr += copyDataBlocks;
checkBlocksPtr += copyCheckBlocks;
segments[i] = new
SplitFileFetcherSegment(splitfileType, dataBlocks, checkBlocks, this,
archiveContext,
- fetchContext, maxTempLength,
recursionLevel+1, parent);
+ fetchContext, maxTempLength,
recursionLevel+1, parent, i);
if(persistent) {
container.set(segments[i]);
segments[i].deactivateKeys(container);
@@ -198,6 +240,71 @@
parent.addBlocks(splitfileDataBlocks.length +
splitfileCheckBlocks.length, container);
parent.addMustSucceedBlocks(splitfileDataBlocks.length,
container);
parent.notifyClients(container, context);
+
+ // Setup bloom parameters.
+ if(persistent) {
+ // FIXME: Should this be encrypted? It's protected to
some degree by the salt...
+ // Since it isn't encrypted, it's likely to be very
sparse; we should name
+ // it appropriately...
+ try {
+ mainBloomFile = context.fg.makeRandomFile();
+ altBloomFile = context.fg.makeRandomFile();
+ } catch (IOException e) {
+ throw new
FetchException(FetchException.BUCKET_ERROR, "Unable to create Bloom filter
files", e);
+ }
+ } else {
+ // Not persistent, keep purely in RAM.
+ mainBloomFile = null;
+ altBloomFile = null;
+ }
+ int mainElementsPerKey = DEFAULT_MAIN_BLOOM_ELEMENTS_PER_KEY;
+ int origSize = splitfileDataBlocks.length +
splitfileCheckBlocks.length;
+ mainBloomK = (int) (mainElementsPerKey * 0.7);
+ long elementsLong = origSize * mainElementsPerKey;
+ // REDFLAG: SIZE LIMIT: 3.36TB limit!
+ if(elementsLong > Integer.MAX_VALUE)
+ throw new FetchException(FetchException.TOO_BIG,
"Cannot fetch splitfiles with more than
"+(Integer.MAX_VALUE/mainElementsPerKey)+" keys! (approx 3.3TB)");
+ int mainSizeBits = (int)elementsLong; // counting filter
+ if((mainSizeBits & 7) != 0)
+ mainSizeBits += (8 - (mainSizeBits & 7));
+ mainBloomFilterSizeBytes = mainSizeBits / 8 * 2; // counting
filter
+ double acceptableFalsePositives =
ACCEPTABLE_BLOOM_FALSE_POSITIVES_ALL_SEGMENTS / segments.length;
+ int perSegmentBitsPerKey = (int)
Math.ceil(Math.log(acceptableFalsePositives) / Math.log(0.6185));
+ int segBlocks = blocksPerSegment + checkBlocksPerSegment;
+ if(segBlocks < origSize)
+ segBlocks = origSize;
+ int perSegmentSize = perSegmentBitsPerKey * segBlocks;
+ if((perSegmentSize & 7) != 0)
+ perSegmentSize += (8 - (perSegmentSize & 7));
+ perSegmentBloomFilterSizeBytes = perSegmentSize / 8;
+ perSegmentK = BloomFilter.optimialK(perSegmentSize,
blocksPerSegment + checkBlocksPerSegment);
+ keyCount = origSize;
+ // Now create it.
+ Logger.error(this, "Creating block filter for "+this+":
keys="+(splitfileDataBlocks.length+splitfileCheckBlocks.length)+" main bloom
size "+mainBloomFilterSizeBytes+" bytes, K="+mainBloomK+",
filename="+mainBloomFile+" alt bloom filter: segments: "+segments.length+" each
is "+perSegmentBloomFilterSizeBytes+" bytes k="+perSegmentK);
+ try {
+ tempListener = new SplitFileFetcherKeyListener(this,
keyCount, mainBloomFile, altBloomFile, mainBloomFilterSizeBytes, mainBloomK,
!fetchContext.cacheLocalRequests, localSalt, segments.length,
perSegmentBloomFilterSizeBytes, perSegmentK, persistent, true);
+
+ // Now add the keys
+ int dataKeysIndex = 0;
+ int checkKeysIndex = 0;
+ int segNo = 0;
+ while(dataKeysIndex < splitfileDataBlocks.length) {
+ int dataKeysEnd = dataKeysIndex +
blocksPerSegment;
+ int checkKeysEnd = checkKeysIndex +
checkBlocksPerSegment;
+ dataKeysEnd = Math.min(dataKeysEnd,
splitfileDataBlocks.length);
+ checkKeysEnd = Math.min(checkKeysEnd,
splitfileCheckBlocks.length);
+ for(int j=dataKeysIndex;j<dataKeysEnd;j++)
+
tempListener.addKey(splitfileDataBlocks[j].getNodeKey(), segNo, context);
+ for(int j=checkKeysIndex;j<checkKeysEnd;j++)
+
tempListener.addKey(splitfileCheckBlocks[j].getNodeKey(), segNo, context);
+ segNo++;
+ dataKeysIndex = dataKeysEnd;
+ checkKeysIndex = checkKeysEnd;
+ }
+ tempListener.writeFilters();
+ } catch (IOException e) {
+ throw new FetchException(FetchException.BUCKET_ERROR,
"Unable to write Bloom filters for splitfile");
+ }
}
/** Return the final status of the fetch. Throws an exception, or
returns a
@@ -303,8 +410,10 @@
if(persistent) {
container.activate(cb, 1);
}
+ context.getChkFetchScheduler().removePendingKeys(this, true);
try {
synchronized(this) {
+ if(otherFailure != null) throw otherFailure;
if(finished) {
Logger.error(this, "Was already
finished");
return;
@@ -350,22 +459,23 @@
}
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
+ public void schedule(ObjectContainer container, ClientContext context)
throws KeyListenerConstructionException {
if(persistent)
container.activate(this, 1);
- if(segments.length > 1)
- regmeOnly = true;
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Scheduling "+this);
+ SendableGet[] getters = new SendableGet[segments.length];
for(int i=0;i<segments.length;i++) {
if(logMINOR)
Logger.minor(this, "Scheduling segment "+i+" :
"+segments[i]);
if(persistent)
container.activate(segments[i], 1);
- segments[i].schedule(container, context, regmeOnly);
+ getters[i] = segments[i].schedule(container, context);
if(persistent)
container.deactivate(segments[i], 1);
}
+ BlockSet blocks = fetchContext.blocks;
+ context.getChkFetchScheduler().register(this, getters,
persistent, true, blocks, false);
}
public void cancel(ObjectContainer container, ClientContext context) {
@@ -385,4 +495,64 @@
return token;
}
+ /**
+ * Make our SplitFileFetcherKeyListener. Returns the one we created in
the
+ * constructor if possible, otherwise makes a new one. We must have
already
+ * constructed one at some point, maybe before a restart.
+ * @throws FetchException
+ */
+ public KeyListener makeKeyListener(ObjectContainer container,
ClientContext context) throws KeyListenerConstructionException {
+ synchronized(this) {
+ if(tempListener != null) {
+ // Recently constructed
+ return tempListener;
+ }
+ try {
+ tempListener =
+ new SplitFileFetcherKeyListener(this,
keyCount, mainBloomFile, altBloomFile, mainBloomFilterSizeBytes, mainBloomK,
!fetchContext.cacheLocalRequests, localSalt, segments.length,
perSegmentBloomFilterSizeBytes, perSegmentK, persistent, false);
+ } catch (IOException e) {
+ Logger.error(this, "Unable to read Bloom filter
for "+this+" attempting to reconstruct...");
+ mainBloomFile.delete();
+ altBloomFile.delete();
+ try {
+ mainBloomFile =
context.fg.makeRandomFile();
+ altBloomFile =
context.fg.makeRandomFile();
+ } catch (IOException e1) {
+ throw new
KeyListenerConstructionException(new
FetchException(FetchException.BUCKET_ERROR, "Unable to create Bloom filter
files in reconstruction", e1));
+ }
+
+ try {
+ tempListener =
+ new
SplitFileFetcherKeyListener(this, keyCount, mainBloomFile, altBloomFile,
mainBloomFilterSizeBytes, mainBloomK, !fetchContext.cacheLocalRequests,
localSalt, segments.length, perSegmentBloomFilterSizeBytes, perSegmentK,
persistent, true);
+ } catch (IOException e1) {
+ throw new
KeyListenerConstructionException(new
FetchException(FetchException.BUCKET_ERROR, "Unable to reconstruct Bloom
filters: "+e1, e1));
+ }
+ }
+ return tempListener;
+ }
+ }
+
+ public synchronized boolean isCancelled(ObjectContainer container) {
+ return finished;
+ }
+
+ public SplitFileFetcherSegment getSegment(int i) {
+ return segments[i];
+ }
+
+ public void removeMyPendingKeys(SplitFileFetcherSegment segment,
ObjectContainer container, ClientContext context) {
+ keyCount = tempListener.killSegment(segment, container,
context);
+ }
+
+ void setKeyCount(int keyCount2, ObjectContainer container) {
+ this.keyCount = keyCount2;
+ if(persistent)
+ container.set(this);
+ }
+
+ public void onFailed(KeyListenerConstructionException e,
ObjectContainer container, ClientContext context) {
+ otherFailure = e.getFetchException();
+ cancel(container, context);
+ }
+
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -44,7 +44,7 @@
* A single segment within a SplitFileFetcher.
* This in turn controls a large number of SplitFileFetcherSubSegment's, which
are registered on the ClientRequestScheduler.
*/
-public class SplitFileFetcherSegment implements FECCallback, GotKeyListener {
+public class SplitFileFetcherSegment implements FECCallback {
private static volatile boolean logMINOR;
final short splitfileType;
@@ -80,6 +80,7 @@
private boolean finishing;
private boolean scheduled;
private final boolean persistent;
+ final int segNum;
// A persistent hashCode is helpful in debugging, and also means we can
put
// these objects into sets etc when we need to.
@@ -92,8 +93,9 @@
private FECCodec codec;
- public SplitFileFetcherSegment(short splitfileType, ClientCHK[]
splitfileDataKeys, ClientCHK[] splitfileCheckKeys, SplitFileFetcher fetcher,
ArchiveContext archiveContext, FetchContext fetchContext, long maxTempLength,
int recursionLevel, ClientRequester requester) throws MetadataParseException,
FetchException {
+ public SplitFileFetcherSegment(short splitfileType, ClientCHK[]
splitfileDataKeys, ClientCHK[] splitfileCheckKeys, SplitFileFetcher fetcher,
ArchiveContext archiveContext, FetchContext fetchContext, long maxTempLength,
int recursionLevel, ClientRequester requester, int segNum) throws
MetadataParseException, FetchException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ this.segNum = segNum;
this.hashCode = super.hashCode();
this.persistent = fetcher.persistent;
this.parentFetcher = fetcher;
@@ -257,7 +259,11 @@
}
parent.completedBlock(dontNotify, container, context);
if(decodeNow) {
- context.getChkFetchScheduler().removePendingKeys(this,
true);
+ if(persistent)
+ container.activate(parentFetcher, 1);
+ parentFetcher.removeMyPendingKeys(this, container,
context);
+ if(persistent)
+ container.deactivate(parentFetcher, 1);
removeSubSegments(container, context);
decode(container, context);
}
@@ -513,10 +519,6 @@
if(logMINOR) Logger.minor(this, "Permanently failed block:
"+blockNo+" on "+this+" : "+e, e);
boolean allFailed;
// Since we can't keep the key, we need to unregister for it at
this point to avoid a memory leak
- NodeCHK key = getBlockNodeKey(blockNo, container);
- if(key != null)
- // don't complain as may already have been removed e.g.
if we have a decode error in onGotKey; don't NPE for same reason
- context.getChkFetchScheduler().removePendingKey(this,
false, key, container);
synchronized(this) {
if(isFinishing(container)) return; // this failure is
now irrelevant, and cleanup will occur on the decoder thread
if(blockNo < dataKeys.length) {
@@ -567,7 +569,7 @@
seg.removeBlockNum(blockNo, container, false);
SplitFileFetcherSubSegment sub = onNonFatalFailure(e, blockNo,
seg, container, context, sched, maxTries);
if(sub != null) {
- sub.schedule(container, context, false, false);
+ sub.reschedule(container, context);
if(persistent && sub != seg) container.deactivate(sub,
1);
}
}
@@ -591,7 +593,7 @@
}
if(toSchedule != null && !toSchedule.isEmpty()) {
for(SplitFileFetcherSubSegment sub : toSchedule) {
- sub.schedule(container, context, false, false);
+ sub.reschedule(container, context);
if(persistent && sub != seg)
container.deactivate(sub, 1);
}
}
@@ -739,7 +741,7 @@
checkBuckets[i] = null;
}
}
- context.getChkFetchScheduler().removePendingKeys(this, true);
+ parentFetcher.removeMyPendingKeys(this, container, context);
removeSubSegments(container, context);
if(persistent) {
container.set(this);
@@ -750,7 +752,7 @@
container.deactivate(parentFetcher, 1);
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
+ public SplitFileFetcherSubSegment schedule(ObjectContainer container,
ClientContext context) {
if(persistent) {
container.activate(this, 1);
}
@@ -768,13 +770,13 @@
}
if(persistent)
container.set(this);
- // Schedule(true) will deactivate me, so we need to do
it after storing scheduled.
- seg.schedule(container, context, true, regmeOnly);
if(persistent)
container.deactivate(seg, 1);
+ return seg;
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" scheduling "+this, t);
fail(new FetchException(FetchException.INTERNAL_ERROR,
t), container, context, true);
+ return null;
}
}
@@ -946,14 +948,14 @@
container.activate(sub, 1);
RandomGrabArray rga = sub.getParentGrabArray();
if(sub.getParentGrabArray() == null) {
- sub.schedule(container, context, false,
false);
+ sub.reschedule(container, context);
} else {
// if(logMINOR) {
if(persistent)
container.activate(rga,
1);
if(!rga.contains(sub,
container)) {
Logger.error(this,
"Sub-segment has RGA but isn't registered to it!!: "+sub+" for "+rga);
- sub.schedule(container,
context, false, false);
+
sub.reschedule(container, context);
}
if(persistent)
container.deactivate(rga, 1);
@@ -1117,9 +1119,12 @@
return (Key[]) v.toArray(new Key[v.size()]);
}
- public void onGotKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context) {
+ /**
+ * @return True if we fetched a block.
+ */
+ public boolean onGotKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context) {
int blockNum = this.getBlockNumber(key, container);
- if(blockNum < 0) return;
+ if(blockNum < 0) return false;
ClientCHK ckey = this.getBlockKey(blockNum, container);
ClientCHKBlock cb;
int retryCount = getBlockRetryCount(blockNum);
@@ -1146,15 +1151,17 @@
cb = new ClientCHKBlock((CHKBlock)block, ckey);
} catch (CHKVerifyException e) {
this.onFatalFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e), blockNum, null,
container, context);
- return;
+ return false;
}
Bucket data = extract(cb, blockNum, container, context);
- if(data == null) return;
+ if(data == null) return false;
if(!cb.isMetadata()) {
this.onSuccess(data, blockNum, cb, container, context);
+ return true;
} else {
this.onFatalFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), blockNum, null, container, context);
+ return true;
}
}
@@ -1201,4 +1208,8 @@
for(int i=0;i<checkKeys.length;i++)
container.deactivate(checkKeys[i], 1);
}
+
+ public SplitFileFetcherSubSegment getSubSegmentFor(int blockNum,
ObjectContainer container) {
+ return getSubSegment(getBlockRetryCount(blockNum), container,
false, null);
+ }
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-08-19 23:20:05 UTC (rev 22045)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -538,7 +538,11 @@
container.set(blockNums);
if(schedule) {
// Only need to register once for all the blocks.
- context.getChkFetchScheduler().register(null, new
SendableGet[] { this }, false, persistent, true, null, null);
+ try {
+ context.getChkFetchScheduler().register(null,
new SendableGet[] { this }, persistent, true, null, true);
+ } catch (KeyListenerConstructionException e) {
+ Logger.error(this, "Impossible: "+e+" on
"+this, e);
+ }
}
}
@@ -590,7 +594,11 @@
container.set(blockNums);
if(schedule) {
if(dontSchedule) return true;
- context.getChkFetchScheduler().register(null, new
SendableGet[] { this }, false, persistent, true, null, null);
+ try {
+ context.getChkFetchScheduler().register(null,
new SendableGet[] { this }, persistent, true, null, true);
+ } catch (KeyListenerConstructionException e) {
+ Logger.error(this, "Impossible: "+e+" on
"+this, e);
+ }
}
return false;
}
@@ -741,8 +749,12 @@
}
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean firstTime, boolean regmeOnly) {
- getScheduler(context).register(firstTime ? segment : null, new
SendableGet[] { this }, regmeOnly, persistent, true,
segment.blockFetchContext.blocks, null);
+ public void reschedule(ObjectContainer container, ClientContext
context) {
+ try {
+ getScheduler(context).register(null, new SendableGet[]
{ this }, persistent, true, segment.blockFetchContext.blocks, true);
+ } catch (KeyListenerConstructionException e) {
+ Logger.error(this, "Impossible: "+e+" on "+this, e);
+ }
}
public boolean removeBlockNum(int blockNum, ObjectContainer container,
boolean callerActivatesAndSets) {
@@ -821,4 +833,11 @@
return blocks;
}
+ @Override
+ public Key[] listKeys(ObjectContainer container) {
+ if(persistent)
+ container.activate(segment, 1);
+ return segment.listKeys(container);
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -94,4 +94,9 @@
public short getPriorityClass() {
return cb.getPriority();
}
+
+ public void onFailed(KeyListenerConstructionException e,
ObjectContainer container, ClientContext context) {
+ onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, "IMPOSSIBLE: Failed
to create Bloom filters (we don't have any!)", e), null, container, context);
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -159,7 +159,7 @@
if(logMINOR)
Logger.minor(this, "Checker == null in
schedule() for "+this, new Exception("debug"));
} else
- checker.schedule(container, context, false);
+ checker.schedule(container, context);
}
public String toString() {
@@ -469,17 +469,17 @@
public void schedule(long delay, ObjectContainer container, final
ClientContext context) {
assert(container == null);
if (delay<=0) {
- schedule(container, context, false);
+ schedule(container, context);
} else {
uskManager.ticker.queueTimedJob(new Runnable() {
public void run() {
- USKFetcher.this.schedule(null, context,
false);
+ USKFetcher.this.schedule(null, context);
}
}, delay);
}
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
+ public void schedule(ObjectContainer container, ClientContext context) {
USKAttempt[] attempts;
long lookedUp = uskManager.lookup(origUSK);
synchronized(this) {
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -91,7 +91,7 @@
usk = usk.copy(edition);
fetcher = manager.getFetcher(usk, ctx, new
USKFetcherWrapper(usk, priority, client), keepLastData);
fetcher.addCallback(this);
- fetcher.schedule(null, context, false); // non-persistent
+ fetcher.schedule(null, context); // non-persistent
}
public void cancel(ObjectContainer container, ClientContext context) {
@@ -118,7 +118,7 @@
return token;
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
+ public void schedule(ObjectContainer container, ClientContext context) {
start(context.uskManager, context);
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKInserter.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/USKInserter.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -78,7 +78,7 @@
if(finished) return;
fetcher =
context.uskManager.getFetcherForInsertDontSchedule(pubUSK,
parent.priorityClass, this, parent.getClient(), container, context);
}
- fetcher.schedule(container, context, false);
+ fetcher.schedule(container, context);
}
public void onFoundEdition(long l, USK key, ObjectContainer container,
ClientContext context, boolean lastContentWasMetadata, short codec, byte[]
hisData) {
Modified: branches/db4o/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKManager.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/USKManager.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -140,7 +140,7 @@
fetcher.cancel(null, context);
}
}
- if(sched != null) sched.schedule(null, context, false);
+ if(sched != null) sched.schedule(null, context);
}
void update(final USK origUSK, final long number, final ClientContext
context) {
@@ -216,7 +216,7 @@
if(fetcher != null) {
ticker.queueTimedJob(new Runnable() {
public void run() {
- fetcher.schedule(null, context, false);
+ fetcher.schedule(null, context);
}
}, 0);
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -48,7 +48,7 @@
SingleFileFetcher getter =
(SingleFileFetcher)
SingleFileFetcher.create(this, this, new ClientMetadata(), uri, ctx, new
ArchiveContext(ctx.maxArchiveLevels),
ctx.maxNonSplitfileRetries, 0,
true, l, true, null, false, null, context);
- getter.schedule(null, context, false);
+ getter.schedule(null, context);
} catch (MalformedURLException e) {
Logger.error(this, "Impossible: "+e, e);
} catch (FetchException e) {
Modified: branches/db4o/freenet/src/freenet/clients/http/StatisticsToadlet.java
===================================================================
--- branches/db4o/freenet/src/freenet/clients/http/StatisticsToadlet.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/clients/http/StatisticsToadlet.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -314,9 +314,6 @@
HTMLNode databaseJobsInfobox =
nextTableCell.addChild("div", "class", "infobox");
drawDatabaseJobsBox(databaseJobsInfobox);
- HTMLNode storeCheckerJobsInfobox =
nextTableCell.addChild("div", "class", "infobox");
- drawStoreCheckerJobsBox(storeCheckerJobsInfobox);
-
// peer distribution box
overviewTableRow = overviewTable.addChild("tr");
nextTableCell = overviewTableRow.addChild("td",
"class", "first");
@@ -478,24 +475,6 @@
}
}
- private void drawStoreCheckerJobsBox(HTMLNode node) {
- node.addChild("div", "class", "infobox-header",
l10n("storeJobsByPriority"));
- HTMLNode threadsInfoboxContent = node.addChild("div", "class",
"infobox-content");
- int[] jobsByPriority =
core.datastoreCheckerExecutor.runningJobs();
-
- HTMLNode threadsByPriorityTable =
threadsInfoboxContent.addChild("table", "border", "0");
- HTMLNode row = threadsByPriorityTable.addChild("tr");
-
- row.addChild("th", l10n("priority"));
- row.addChild("th", l10n("waiting"));
-
- for(int i=0; i<jobsByPriority.length; i++) {
- row = threadsByPriorityTable.addChild("tr");
- row.addChild("td", String.valueOf(i));
- row.addChild("td", String.valueOf(jobsByPriority[i]));
- }
- }
-
private void drawStoreSizeBox(HTMLNode storeSizeInfobox, double loc,
long nodeUptimeSeconds) {
storeSizeInfobox.addChild("div", "class", "infobox-header",
"Datastore");
Modified: branches/db4o/freenet/src/freenet/node/LowLevelGetException.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/LowLevelGetException.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/node/LowLevelGetException.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -62,7 +62,7 @@
/** Failure code */
public final int code;
- LowLevelGetException(int code, String message, Throwable t) {
+ public LowLevelGetException(int code, String message, Throwable t) {
super(message, t);
this.code = code;
}
Modified: branches/db4o/freenet/src/freenet/node/Node.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/Node.java 2008-08-19 23:20:05 UTC
(rev 22045)
+++ branches/db4o/freenet/src/freenet/node/Node.java 2008-08-20 00:24:11 UTC
(rev 22046)
@@ -112,6 +112,7 @@
import freenet.support.LRUHashtable;
import freenet.support.LRUQueue;
import freenet.support.Logger;
+import freenet.support.NullObject;
import freenet.support.OOMHandler;
import freenet.support.PooledExecutor;
import freenet.support.ShortBuffer;
@@ -801,14 +802,12 @@
Db4o.configure().objectClass(freenet.client.async.RegisterMe.class).objectField("core").indexed(true);
Db4o.configure().objectClass(freenet.client.async.RegisterMe.class).objectField("key").indexed(true);
Db4o.configure().objectClass(freenet.client.async.PersistentCooldownQueueItem.class).objectField("time").indexed(true);
-
Db4o.configure().objectClass(freenet.client.async.PendingKeyItem.class).objectField("key").indexed(true);
-
Db4o.configure().objectClass(freenet.client.async.PendingKeyItem.class).objectField("fullKeyAsBytes").indexed(true);
Db4o.configure().objectClass(freenet.client.FECJob.class).objectField("priority").indexed(true);
Db4o.configure().objectClass(freenet.client.FECJob.class).objectField("addedTime").indexed(true);
Db4o.configure().objectClass(freenet.client.FECJob.class).objectField("queue").indexed(true);
-
Db4o.configure().objectClass(freenet.client.async.PendingKeyItem.class).objectField("nodeDBHandle").indexed(true);
Db4o.configure().objectClass(freenet.client.async.InsertCompressor.class).objectField("nodeDBHandle").indexed(true);
Db4o.configure().objectClass(freenet.node.fcp.FCPClient.class).objectField("name").indexed(true);
+
Db4o.configure().objectClass(freenet.client.async.DatastoreCheckerItem.class).objectField("prio").indexed(true);
/** Maybe we want a different query evaluation mode?
* At the moment, a big splitfile insert will result in one
SingleBlockInserter
* for every key, which means one RegisterMe for each ... this
results in a long pause
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-08-19
23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-08-20
00:24:11 UTC (rev 22046)
@@ -18,6 +18,7 @@
import freenet.client.async.ClientRequestScheduler;
import freenet.client.async.DBJob;
import freenet.client.async.DBJobRunner;
+import freenet.client.async.DatastoreChecker;
import freenet.client.async.HealingQueue;
import freenet.client.async.InsertCompressor;
import freenet.client.async.SimpleHealingQueue;
@@ -95,7 +96,7 @@
private boolean downloadAllowedEverywhere;
private File[] uploadAllowedDirs;
private boolean uploadAllowedEverywhere;
- final FilenameGenerator tempFilenameGenerator;
+ public final FilenameGenerator tempFilenameGenerator;
public final BucketFactory tempBucketFactory;
public final Node node;
final NodeStats nodeStats;
@@ -124,11 +125,7 @@
* Note that the priorities are thread priorities, not request
priorities.
*/
public transient final PrioritizedSerialExecutor clientDatabaseExecutor;
- /**
- * Whenever a new request is added, we have to check the datastore. We
funnel all such access
- * through this thread. Note that the priorities are request
priorities, not thread priorities.
- */
- public transient final PrioritizedSerialExecutor
datastoreCheckerExecutor;
+ public final DatastoreChecker storeChecker;
public transient final ClientContext clientContext;
@@ -150,7 +147,7 @@
fecQueue = FECQueue.create(node.nodeDBHandle, container);
this.backgroundBlockEncoder = new BackgroundBlockEncoder();
clientDatabaseExecutor = new
PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY,
NativeThread.MAX_PRIORITY+1, NativeThread.NORM_PRIORITY, true);
- datastoreCheckerExecutor = new
PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY,
RequestStarter.NUMBER_OF_PRIORITY_CLASSES, 0, false);
+ storeChecker = new DatastoreChecker(node);
byte[] pwdBuf = new byte[16];
random.nextBytes(pwdBuf);
this.formPassword = Base64.encode(pwdBuf);
@@ -234,6 +231,7 @@
!Node.DONT_CACHE_LOCAL_REQUESTS), RequestStarter.PREFETCH_PRIORITY_CLASS, 512
/* FIXME make configurable */);
clientContext = new ClientContext(this);
+ storeChecker.setContext(clientContext);
requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config, throttleFS, clientContext);
clientContext.init(requestStarters);
InsertCompressor.load(container, clientContext);
@@ -437,7 +435,7 @@
}, NativeThread.NORM_PRIORITY, false);
persister.start();
- datastoreCheckerExecutor.start(node.executor, "Datastore
checker");
+ storeChecker.start(node.executor, "Datastore checker");
clientDatabaseExecutor.start(node.executor, "Client database
access thread");
if(fcpServer != null)
fcpServer.maybeStart();
Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-08-19
23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-08-20
00:24:11 UTC (rev 22046)
@@ -31,6 +31,14 @@
return key.getNodeKey();
}
+ /**
+ * What keys are we interested in? For purposes of checking the
datastore.
+ * This is in SendableGet, *not* KeyListener, in order to deal with it
in
+ * smaller chunks.
+ * @param container Database handle.
+ */
+ public abstract Key[] listKeys(ObjectContainer container);
+
/** Get the fetch context (settings) object. */
public abstract FetchContext getContext();
Modified: branches/db4o/freenet/src/freenet/node/Version.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/Version.java 2008-08-19 23:20:05 UTC
(rev 22045)
+++ branches/db4o/freenet/src/freenet/node/Version.java 2008-08-20 00:24:11 UTC
(rev 22046)
@@ -24,17 +24,17 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 1155;
+ private static final int buildNumber = 1158;
/** Oldest build of Fred we will talk to */
private static final int oldLastGoodBuild = 1154;
- private static final int newLastGoodBuild = 1155;
+ private static final int newLastGoodBuild = 1158;
static final long transitionTime;
static {
final Calendar _cal =
Calendar.getInstance(TimeZone.getTimeZone("GMT"));
// year, month - 1 (or constant), day, hour, minute, second
- _cal.set( 2008, Calendar.AUGUST, 13, 0, 0, 0 );
+ _cal.set( 2008, Calendar.AUGUST, 20, 0, 0, 0 );
transitionTime = _cal.getTimeInMillis();
}
Copied: branches/db4o/freenet/src/freenet/support/BinaryBloomFilter.java (from
rev 21911,
branches/saltedhashstore/freenet/src/freenet/support/BinaryBloomFilter.java)
===================================================================
--- branches/db4o/freenet/src/freenet/support/BinaryBloomFilter.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/support/BinaryBloomFilter.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -0,0 +1,86 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.support;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+
+/**
+ * @author sdiz
+ */
+public class BinaryBloomFilter extends BloomFilter {
+ /**
+ * Constructor
+ *
+ * @param length
+ * length in bits
+ */
+ protected BinaryBloomFilter(int length, int k) {
+ super(length, k);
+ filter = ByteBuffer.allocate(length / 8);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param file
+ * disk file
+ * @param length
+ * length in bits
+ * @throws IOException
+ */
+ protected BinaryBloomFilter(File file, int length, int k) throws
IOException {
+ super(length, k);
+ if (!file.exists() || file.length() != length / 8)
+ needRebuild = true;
+
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ raf.setLength(length / 8);
+ filter = raf.getChannel().map(MapMode.READ_WRITE, 0, length /
8).load();
+ }
+
+ public BinaryBloomFilter(ByteBuffer slice, int length, int k) {
+ super(length, k);
+ filter = slice;
+ }
+
+ @Override
+ public void removeKey(byte[] key) {
+ // ignore
+ }
+
+ @Override
+ protected boolean getBit(int offset) {
+ return (filter.get(offset / 8) & (1 << (offset % 8))) != 0;
+ }
+
+ @Override
+ protected void setBit(int offset) {
+ byte b = filter.get(offset / 8);
+ b |= 1 << (offset % 8);
+ filter.put(offset / 8, b);
+ }
+
+ @Override
+ protected void unsetBit(int offset) {
+ // NO-OP
+ }
+
+ @Override
+ public void fork(int k) {
+ lock.writeLock().lock();
+ try {
+ File tempFile = File.createTempFile("bloom-", ".tmp");
+ tempFile.deleteOnExit();
+ forkedFilter = new BinaryBloomFilter(tempFile, length,
k);
+ } catch (IOException e) {
+ forkedFilter = new BinaryBloomFilter(length, k);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+}
Property changes on:
branches/db4o/freenet/src/freenet/support/BinaryBloomFilter.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: branches/db4o/freenet/src/freenet/support/BloomFilter.java (from rev
21911, branches/saltedhashstore/freenet/src/freenet/support/BloomFilter.java)
===================================================================
--- branches/db4o/freenet/src/freenet/support/BloomFilter.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/support/BloomFilter.java 2008-08-20
00:24:11 UTC (rev 22046)
@@ -0,0 +1,185 @@
+package freenet.support;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Random;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.spaceroots.mantissa.random.MersenneTwister;
+
+public abstract class BloomFilter {
+ protected ByteBuffer filter;
+
+ /** Number of hash functions */
+ protected final int k;
+ protected final int length;
+
+ protected ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public static BloomFilter createFilter(File file, int length, int k,
boolean counting) throws IOException {
+ if (k == 0 || length == 0)
+ return new NullBloomFilter(length, k);
+ if (counting)
+ return new CountingBloomFilter(file, length, k);
+ else
+ return new BinaryBloomFilter(file, length, k);
+ }
+
+ protected BloomFilter(int length, int k) {
+ if (length % 8 != 0)
+ throw new IllegalArgumentException();
+
+ this.length = length;
+ this.k = k;
+ }
+
+ //-- Core
+ public void addKey(byte[] key) {
+ Random hashes = getHashes(key);
+ lock.writeLock().lock();
+ try {
+ for (int i = 0; i < k; i++)
+ setBit(hashes.nextInt(length));
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (forkedFilter != null)
+ forkedFilter.addKey(key);
+ }
+
+ public boolean checkFilter(byte[] key) {
+ Random hashes = getHashes(key);
+ lock.readLock().lock();
+ try {
+ for (int i = 0; i < k; i++)
+ if (!getBit(hashes.nextInt(length)))
+ return false;
+ } finally {
+ lock.readLock().unlock();
+ }
+ return true;
+ }
+
+ public void removeKey(byte[] key) {
+ Random hashes = getHashes(key);
+ lock.writeLock().lock();
+ try {
+ for (int i = 0; i < k; i++)
+ unsetBit(hashes.nextInt(length));
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (forkedFilter != null)
+ forkedFilter.removeKey(key);
+ }
+
+ //-- Bits and Hashes
+ protected abstract boolean getBit(int offset);
+
+ protected abstract void setBit(int offset);
+
+ protected abstract void unsetBit(int offset);
+
+ // Wierd impl's should override
+ public void unsetAll() {
+ int x = filter.limit();
+ for(int i=0;i<x;i++)
+ filter.put(i, (byte)0);
+ }
+
+ protected Random getHashes(byte[] key) {
+ return new MersenneTwister(key);
+ }
+
+ //-- Fork & Merge
+ protected BloomFilter forkedFilter;
+
+ /**
+ * Create an empty, in-memory copy of bloom filter. New updates are
written to both filters.
+ * This is written back to disk on #merge()
+ */
+ public abstract void fork(int k);
+
+ public void merge() {
+ lock.writeLock().lock();
+ try {
+ if (forkedFilter == null)
+ return;
+
+ filter.position(0);
+ forkedFilter.filter.position(0);
+
+ filter.put(forkedFilter.filter);
+
+ filter.position(0);
+ forkedFilter.finalize();
+ forkedFilter = null;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void discard() {
+ lock.writeLock().lock();
+ try {
+ if (forkedFilter == null)
+ return;
+ forkedFilter.finalize();
+ forkedFilter = null;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ //-- Misc.
+ /**
+ * Calculate optimal K value
+ *
+ * @param filterLength
+ * filter length in bits
+ * @param maxKey
+ * @return optimal K
+ */
+ // may return 0 if the length is too short
+ public static int optimialK(int filterLength, long maxKey) {
+ long k = Math.round(Math.log(2) * filterLength / maxKey);
+
+ if (k > 64)
+ k = 64;
+
+ return (int) k;
+ }
+
+ public int getK() {
+ return k;
+ }
+
+ protected boolean needRebuild;
+
+ public boolean needRebuild() {
+ boolean _needRebuild = needRebuild;
+ needRebuild = false;
+ return _needRebuild;
+
+ }
+
+ public void force() {
+ if (filter instanceof MappedByteBuffer) {
+ ((MappedByteBuffer) filter).force();
+ }
+ }
+
+ @Override
+ protected void finalize() {
+ if (filter != null) {
+ force();
+ }
+ filter = null;
+ forkedFilter = null;
+ }
+}
Property changes on: branches/db4o/freenet/src/freenet/support/BloomFilter.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: branches/db4o/freenet/src/freenet/support/CountingBloomFilter.java
(from rev 21911,
branches/saltedhashstore/freenet/src/freenet/support/CountingBloomFilter.java)
===================================================================
--- branches/db4o/freenet/src/freenet/support/CountingBloomFilter.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/support/CountingBloomFilter.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -0,0 +1,102 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.support;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+
+/**
+ * @author sdiz
+ */
+public class CountingBloomFilter extends BloomFilter {
+ /**
+ * Constructor
+ *
+ * @param length
+ * length in bits
+ */
+ public CountingBloomFilter(int length, int k) {
+ super(length, k);
+ filter = ByteBuffer.allocate(length / 8 * 2);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param file
+ * disk file
+ * @param length
+ * length in bits
+ * @throws IOException
+ */
+ protected CountingBloomFilter(File file, int length, int k) throws
IOException {
+ super(length, k);
+ int fileLength = length / 8 * 2;
+ if (!file.exists() || file.length() != fileLength)
+ needRebuild = true;
+
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ raf.setLength(fileLength);
+ filter = raf.getChannel().map(MapMode.READ_WRITE, 0,
fileLength).load();
+ }
+
+ public CountingBloomFilter(int length, int k, byte[] buffer) {
+ super(length, k);
+ assert(buffer.length == length / 8 * 2);
+ filter = ByteBuffer.wrap(buffer);
+ }
+
+ @Override
+ protected boolean getBit(int offset) {
+ byte b = filter.get(offset / 8 * 2);
+ byte v = (byte) ((b >>> offset % 4 * 2) & 3);
+
+ return v != 0;
+ }
+
+ @Override
+ protected void setBit(int offset) {
+ byte b = filter.get(offset / 8 * 2);
+ byte v = (byte) ((b >>> offset % 4 * 2) & 3);
+
+ if (v == 3)
+ return; // overflow
+
+ b &= ~(3 << offset % 4 * 2); // unset bit
+ b |= (v + 1) << offset % 4 * 2; // set bit
+
+ filter.put(offset / 8 * 2, b);
+ }
+
+ @Override
+ protected void unsetBit(int offset) {
+ byte b = filter.get(offset / 8 * 2);
+ byte v = (byte) ((b >>> offset % 4 * 2) & 3);
+
+ if (v == 0 || v == 3)
+ return; // overflow / underflow
+
+ b &= ~(3 << offset % 4 * 2); // unset bit
+ b |= (v - 1) << offset % 4 * 2; // set bit
+
+ filter.put(offset / 8 * 2, b);
+ }
+
+ @Override
+ public void fork(int k) {
+ lock.writeLock().lock();
+ try {
+ File tempFile = File.createTempFile("bloom-", ".tmp");
+ tempFile.deleteOnExit();
+ forkedFilter = new CountingBloomFilter(tempFile,
length, k);
+ } catch (IOException e) {
+ forkedFilter = new CountingBloomFilter(length, k);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+}
Property changes on:
branches/db4o/freenet/src/freenet/support/CountingBloomFilter.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: branches/db4o/freenet/src/freenet/support/NullBloomFilter.java (from
rev 21911,
branches/saltedhashstore/freenet/src/freenet/support/NullBloomFilter.java)
===================================================================
--- branches/db4o/freenet/src/freenet/support/NullBloomFilter.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/support/NullBloomFilter.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -0,0 +1,59 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.support;
+
+/**
+ * @author sdiz
+ */
+public class NullBloomFilter extends BloomFilter {
+ protected NullBloomFilter(int length, int k) {
+ super(length, k);
+ }
+
+ @Override
+ public boolean checkFilter(byte[] key) {
+ return true;
+ }
+
+ @Override
+ public void addKey(byte[] key) {
+ // ignore
+ }
+
+ @Override
+ public void removeKey(byte[] key) {
+ // ignore
+ }
+
+ @Override
+ protected boolean getBit(int offset) {
+ // ignore
+ return true;
+ }
+
+ @Override
+ protected void setBit(int offset) {
+ // ignore
+ }
+
+ @Override
+ protected void unsetBit(int offset) {
+ // ignore
+ }
+
+ @Override
+ public void fork(int k) {
+ return;
+ }
+
+ @Override
+ public void discard() {
+ return;
+ }
+
+ @Override
+ public void merge() {
+ return;
+ }
+}
Modified: branches/db4o/freenet/src/freenet/support/io/FilenameGenerator.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/io/FilenameGenerator.java
2008-08-19 23:20:05 UTC (rev 22045)
+++ branches/db4o/freenet/src/freenet/support/io/FilenameGenerator.java
2008-08-20 00:24:11 UTC (rev 22046)
@@ -81,6 +81,13 @@
public File getFilename(long id) {
return new File(tmpDir, prefix + Long.toHexString(id));
}
+
+ public File makeRandomFile() throws IOException {
+ while(true) {
+ File file = getFilename(makeRandomFilename());
+ if(file.createNewFile()) return file;
+ }
+ }
public boolean matches(File file) {
return getID(file) != -1;