Author: toad
Date: 2008-08-21 12:37:19 +0000 (Thu, 21 Aug 2008)
New Revision: 22070
Added:
branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java
branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java
branches/db4o/freenet/src/freenet/client/async/KeyListener.java
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
Log:
AAAAAAAAAAAARGH! Missing files!
Added: branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,385 @@
+package freenet.client.async;
+
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Query;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.keys.NodeSSK;
+import freenet.node.Node;
+import freenet.node.PrioRunnable;
+import freenet.node.RequestStarter;
+import freenet.node.SendableGet;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.io.NativeThread;
+
+/**
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public class DatastoreChecker implements PrioRunnable {
+
+ static final int MAX_PERSISTENT_KEYS = 1024;
+
+ /** List of arrays of keys to check for persistent requests. PARTIAL:
+ * When we run out we will look up some more DatastoreCheckerItem's. */
+ private final ArrayList<Key[]>[] persistentKeys;
+ /** List of persistent requests which we will call finishRegister() for
+ * when we have checked the keys lists. PARTIAL: When we run out we
+ * will look up some more DatastoreCheckerItem's. Deactivated. */
+ private final ArrayList<SendableGet>[] persistentGetters;
+ private final ArrayList<Boolean>[] persistentDontCache;
+ private final ArrayList<ClientRequestScheduler>[] persistentSchedulers;
+ private final ArrayList<DatastoreCheckerItem>[] persistentCheckerItems;
+ private final ArrayList<BlockSet>[] persistentBlockSets;
+
+ /** List of arrays of keys to check for transient requests. */
+ private final ArrayList<Key[]>[] transientKeys;
+ /** List of transient requests which we will call finishRegister() for
+ * when we have checked the keys lists. */
+ private final ArrayList<SendableGet>[] transientGetters;
+ private final ArrayList<BlockSet>[] transientBlockSets;
+
+ private ClientContext context;
+ private final Node node;
+
+ public synchronized void setContext(ClientContext context) {
+ this.context = context;
+ }
+
+ public DatastoreChecker(Node node) {
+ this.node = node;
+ int priorities = RequestStarter.NUMBER_OF_PRIORITY_CLASSES;
+ persistentKeys = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ persistentKeys[i] = new ArrayList<Key[]>();
+ persistentGetters = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ persistentGetters[i] = new ArrayList<SendableGet>();
+ persistentDontCache = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ persistentDontCache[i] = new ArrayList<Boolean>();
+ persistentSchedulers = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ persistentSchedulers[i] = new
ArrayList<ClientRequestScheduler>();
+ persistentCheckerItems = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ persistentCheckerItems[i] = new
ArrayList<DatastoreCheckerItem>();
+ persistentBlockSets = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ persistentBlockSets[i] = new ArrayList<BlockSet>();
+ transientKeys = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ transientKeys[i] = new ArrayList<Key[]>();
+ transientGetters = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ transientGetters[i] = new ArrayList<SendableGet>();
+ transientBlockSets = new ArrayList[priorities];
+ for(int i=0;i<priorities;i++)
+ transientBlockSets[i] = new ArrayList<BlockSet>();
+ }
+
+ private final DBJob loader = new DBJob() {
+
+ public void run(ObjectContainer container, ClientContext
context) {
+ loadPersistentRequests(container, context);
+ }
+
+ };
+
+ public void loadPersistentRequests(ObjectContainer container, final
ClientContext context) {
+ boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ int totalSize = 0;
+ synchronized(this) {
+ for(int i=0;i<persistentKeys.length;i++) {
+ for(int j=0;j<persistentKeys[i].size();j++)
+ totalSize +=
persistentKeys[i].get(j).length;
+ }
+ if(totalSize > MAX_PERSISTENT_KEYS) {
+ if(logMINOR) Logger.minor(this, "Persistent
datastore checker queue alreadyfull");
+ return;
+ }
+ }
+ for(short p = RequestStarter.MAXIMUM_PRIORITY_CLASS; p <
RequestStarter.MINIMUM_PRIORITY_CLASS; p++) {
+ final short prio = p;
+ Query query = container.query();
+ query.constrain(DatastoreCheckerItem.class);
+
query.descend("nodeDBHandle").constrain(context.nodeDBHandle).
+ and(query.descend("prio").constrain(prio));
+ ObjectSet<DatastoreCheckerItem> results =
query.execute();
+ for(DatastoreCheckerItem item : results) {
+ if(item.chosenBy == context.bootID) continue;
+ SendableGet getter = item.getter;
+ BlockSet blocks = item.blocks;
+ container.activate(getter, 1);
+ boolean dontCache = getter.dontCache();
+ ClientRequestScheduler sched =
getter.getScheduler(context);
+ synchronized(this) {
+
if(persistentGetters[prio].contains(getter)) continue;
+ }
+ Key[] keys = getter.listKeys(container);
+ // FIXME check the store bloom filter using
store.probablyInStore().
+ item.chosenBy = context.bootID;
+ container.set(item);
+ synchronized(this) {
+
if(persistentGetters[prio].contains(getter)) continue;
+ ArrayList<Key> finalKeysToCheck = new
ArrayList<Key>();
+ for(Key key : keys) {
+ key = key.cloneKey();
+ finalKeysToCheck.add(key);
+ }
+ Key[] finalKeys =
+ finalKeysToCheck.toArray(new
Key[finalKeysToCheck.size()]);
+ persistentKeys[prio].add(finalKeys);
+ persistentGetters[prio].add(getter);
+
persistentDontCache[prio].add(dontCache);
+ persistentSchedulers[prio].add(sched);
+ persistentCheckerItems[prio].add(item);
+ persistentBlockSets[prio].add(blocks);
+ if(totalSize == 0)
+ notifyAll();
+ totalSize += finalKeys.length;
+ if(totalSize > MAX_PERSISTENT_KEYS) {
+ if(trimPersistentQueue(prio,
container)) return;
+ notifyAll();
+ }
+ }
+ container.deactivate(getter, 1);
+ }
+ }
+ }
+
+ /**
+ * Trim the queue of persistent requests until it is just over the
limit.
+ * @param minPrio Only drop from priorities lower than this one.
+ * @return True unless the queue is under the limit.
+ */
+ private boolean trimPersistentQueue(short prio, ObjectContainer
container) {
+ synchronized(this) {
+ int preQueueSize = 0;
+ for(int i=0;i<prio;i++) {
+ for(int x=0;x<persistentKeys[i].size();x++)
+ preQueueSize +=
persistentKeys[i].get(x).length;
+ }
+ if(preQueueSize > MAX_PERSISTENT_KEYS) {
+ // Dump everything
+ for(int i=prio+1;i<persistentKeys.length;i++) {
+ while(!persistentKeys[i].isEmpty()) {
+ int idx =
persistentKeys[i].size() - 1;
+ DatastoreCheckerItem item =
persistentCheckerItems[i].remove(idx);
+
persistentSchedulers[i].remove(idx);
+
persistentDontCache[i].remove(idx);
+
persistentGetters[i].remove(idx);
+ persistentKeys[i].remove(idx);
+
persistentBlockSets[i].remove(idx);
+ item.chosenBy = 0;
+ container.set(item);
+ }
+ }
+ return true;
+ } else {
+ int postQueueSize = 0;
+ for(int i=prio+1;i<persistentKeys.length;i++) {
+ for(int
x=0;x<persistentKeys[i].size();x++)
+ postQueueSize +=
persistentKeys[i].get(x).length;
+ }
+ if(postQueueSize + preQueueSize <
MAX_PERSISTENT_KEYS)
+ return false;
+ // Need to dump some stuff.
+ for(int i=persistentKeys.length-1;i>prio;i--) {
+ while(!persistentKeys[i].isEmpty()) {
+ int idx =
persistentKeys[i].size() - 1;
+ DatastoreCheckerItem item =
persistentCheckerItems[i].remove(idx);
+
persistentSchedulers[i].remove(idx);
+
persistentDontCache[i].remove(idx);
+
persistentGetters[i].remove(idx);
+ Key[] keys =
persistentKeys[i].remove(idx);
+
persistentBlockSets[i].remove(idx);
+ item.chosenBy = 0;
+ container.set(item);
+ if(postQueueSize + preQueueSize
- keys.length < MAX_PERSISTENT_KEYS) {
+ return false;
+ }
+ }
+ }
+ // Still over the limit.
+ return true;
+ }
+ }
+ }
+
+ public void queueTransientRequest(SendableGet getter, BlockSet blocks) {
+ Key[] checkKeys = getter.listKeys(null);
+ short prio = getter.getPriorityClass(null);
+ // FIXME check using store.probablyInStore
+ ArrayList<Key> finalKeysToCheck = new ArrayList<Key>();
+ synchronized(this) {
+ for(Key key : checkKeys) {
+ finalKeysToCheck.add(key);
+ }
+ transientGetters[prio].add(getter);
+ transientKeys[prio].add(finalKeysToCheck.toArray(new
Key[finalKeysToCheck.size()]));
+ transientBlockSets[prio].add(blocks);
+ notifyAll();
+ }
+ }
+
+ /**
+ * Queue a persistent request. We will store a DatastoreCheckerItem,
then
+ * check the datastore (on the datastore checker thread), and then call
+ * finishRegister() (on the database thread). Caller must have already
+ * stored and registered the HasKeyListener if any.
+ * @param getter
+ */
+ public void queuePersistentRequest(SendableGet getter, BlockSet blocks,
ObjectContainer container) {
+ Key[] checkKeys = getter.listKeys(container);
+ short prio = getter.getPriorityClass(container);
+ boolean dontCache = getter.dontCache();
+ ClientRequestScheduler sched = getter.getScheduler(context);
+ DatastoreCheckerItem item = new DatastoreCheckerItem(getter,
context.nodeDBHandle, prio, blocks);
+ container.set(item);
+ container.activate(blocks, 5);
+ synchronized(this) {
+ // FIXME only add if queue not full.
+ int queueSize = 0;
+ // Only count queued keys at no higher priority than
this request.
+ for(short p = 0;p<=prio;p++) {
+ for(int x = 0;x<persistentKeys[p].size();x++) {
+ queueSize +=
persistentKeys[p].get(x).length;
+ }
+ }
+ if(queueSize > MAX_PERSISTENT_KEYS) return;
+ item.chosenBy = context.bootID;
+ container.set(item);
+ // FIXME check using store.probablyInStore
+ ArrayList<Key> finalKeysToCheck = new ArrayList<Key>();
+ for(Key key : checkKeys) {
+ finalKeysToCheck.add(key);
+ }
+ persistentGetters[prio].add(getter);
+ persistentKeys[prio].add(finalKeysToCheck.toArray(new
Key[finalKeysToCheck.size()]));
+ persistentDontCache[prio].add(dontCache);
+ persistentSchedulers[prio].add(sched);
+ persistentCheckerItems[prio].add(item);
+ persistentBlockSets[prio].add(blocks);
+ trimPersistentQueue(prio, container);
+ notifyAll();
+ }
+ }
+
+ public void run() {
+ while(true) {
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" in datastore
checker thread", t);
+ }
+ }
+ }
+
+ private void realRun() {
+ boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ Key[] keys = null;
+ SendableGet getter = null;
+ boolean persistent = false;
+ boolean dontCache = false;
+ ClientRequestScheduler sched = null;
+ DatastoreCheckerItem item = null;
+ BlockSet blocks = null;
+ short priority = -1;
+ synchronized(this) {
+ while(true) {
+ for(short prio =
0;prio<transientKeys.length;prio++) {
+ if(!transientKeys[prio].isEmpty()) {
+ keys =
transientKeys[prio].remove(0);
+ getter =
transientGetters[prio].remove(0);
+ persistent = false;
+ item = null;
+ blocks =
transientBlockSets[prio].remove(0);
+ priority = prio;
+ break;
+ } else
if(!persistentGetters[prio].isEmpty()) {
+ keys =
persistentKeys[prio].remove(0);
+ getter =
persistentGetters[prio].remove(0);
+ persistent = true;
+ dontCache =
persistentDontCache[prio].remove(0);
+ sched =
persistentSchedulers[prio].remove(0);
+ item =
persistentCheckerItems[prio].remove(0);
+ blocks =
persistentBlockSets[prio].remove(0);
+ priority = prio;
+ break;
+ }
+ }
+ if(keys == null) {
+ try {
+ wait(100*1000);
+ } catch (InterruptedException e) {
+ // Ok
+ }
+ context.jobRunner.queue(loader,
NativeThread.HIGH_PRIORITY, true);
+ continue;
+ }
+ break;
+ }
+ }
+ if(!persistent) {
+ dontCache = getter.dontCache();
+ sched = getter.getScheduler(context);
+ }
+ boolean anyValid = false;
+ for(Key key : keys) {
+ KeyBlock block = null;
+ if(blocks != null)
+ block = blocks.get(key);
+ if(blocks == null)
+ block = node.fetch(key, dontCache);
+ if(block != null) {
+ if(logMINOR) Logger.minor(this, "Found key");
+ if(key instanceof NodeSSK)
+ sched.tripPendingKey(block);
+ else // CHK
+ sched.tripPendingKey(block);
+ } else {
+ anyValid = true;
+ }
+// synchronized(this) {
+// keysToCheck[priority].remove(key);
+// }
+ }
+ if(persistent)
+ context.jobRunner.queue(loader,
NativeThread.HIGH_PRIORITY, true);
+ if(persistent) {
+ final SendableGet get = getter;
+ final ClientRequestScheduler scheduler = sched;
+ final boolean valid = anyValid;
+ final DatastoreCheckerItem it = item;
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ scheduler.finishRegister(new
SendableGet[] { get }, true, true, valid, it);
+ loader.run(container, context);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+ } else {
+ sched.finishRegister(new SendableGet[] { getter },
false, false, anyValid, item);
+ }
+ }
+
+ synchronized void wakeUp() {
+ notifyAll();
+ }
+
+ public void start(Executor executor, String name) {
+ context.jobRunner.queue(loader, NativeThread.HIGH_PRIORITY-1,
true);
+ executor.execute(this, name);
+ }
+
+ public int getPriority() {
+ return NativeThread.NORM_PRIORITY;
+ }
+
+}
Added: branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,26 @@
+package freenet.client.async;
+
+import freenet.node.SendableGet;
+
+/**
+ * Persistent tag for a persistent request which needs to check the datastore
+ * and then be registered.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ *
+ */
+public class DatastoreCheckerItem {
+
+ final long nodeDBHandle;
+ final SendableGet getter;
+ final short prio;
+ long chosenBy;
+ final BlockSet blocks;
+
+ DatastoreCheckerItem(SendableGet getter, long nodeDBHandle, short prio,
BlockSet blocks) {
+ this.getter = getter;
+ this.nodeDBHandle = nodeDBHandle;
+ this.prio = prio;
+ this.blocks = blocks;
+ }
+
+}
Added: branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,30 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import com.db4o.ObjectContainer;
+
+/**
+ * Interface to show that we can create a KeyListener callback.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public interface HasKeyListener {
+
+ /**
+ * Create a KeyListener, a transient object used to determine which
keys we
+ * want, and to handle any blocks found.
+ * @return Null if the HasKeyListener is finished/cancelled/etc.
+ * @throws IOException
+ */
+ KeyListener makeKeyListener(ObjectContainer container, ClientContext
context) throws KeyListenerConstructionException;
+
+ /**
+ * Is it cancelled?
+ */
+ boolean isCancelled(ObjectContainer container);
+
+ /**
+ * Notify that makeKeyListener() failed.
+ */
+ void onFailed(KeyListenerConstructionException e, ObjectContainer
container, ClientContext context);
+}
Added: branches/db4o/freenet/src/freenet/client/async/KeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/KeyListener.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/KeyListener.java
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,93 @@
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.SendableGet;
+
+/**
+ * Transient object created on startup for persistent requests (or at creation
+ * time for non-persistent requests), to monitor the stream of successfully
+ * fetched keys. If a key appears interesting, we schedule a job on the
database
+ * thread to double-check and process the data if we still want it.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ *
+ * saltedKey is the routing key from the key, salted globally (concat a global
+ * salt value and then SHA) in order to save some cycles. Implementations that
+ * use two internal bloom filters may need to have an additional local salt, as
+ * in SplitFileFetcherKeyListener.
+ */
+public interface KeyListener {
+
+ /**
+ * Fast guess at whether we want a key or not. Usually implemented by a
+ * bloom filter.
+ * LOCKING: Should avoid external locking if possible. Will be called
+ * within the CRSBase lock.
+ * @return True if we probably want the key. False if we definitely
don't
+ * want it.
+ */
+ public boolean probablyWantKey(Key key, byte[] saltedKey);
+
+ /**
+ * Do we want the key? This is called by the ULPR code, because
fetching the
+ * key will involve significant work. tripPendingKey() on the other hand
+ * will go straight to handleBlock().
+ * @return -1 if we don't want the key, otherwise the priority of the
request
+ * interested in the key.
+ */
+ public short definitelyWantKey(Key key, byte[] saltedKey,
ObjectContainer container, ClientContext context);
+
+ /**
+ * Find the requests related to a specific key, used in retrying after
cooldown.
+ * Caller should call probablyWantKey() first.
+ */
+ public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey,
ObjectContainer container, ClientContext context);
+
+ /**
+ * Handle the found data, if we really want it.
+ */
+ public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock found,
ObjectContainer container, ClientContext context);
+
+ /**
+ * Is this related to a persistent request?
+ */
+ boolean persistent();
+
+ /**
+ * Priority of the associated request.
+ * LOCKING: Should avoid external locking if possible. Will be called
+ * within the CRSBase lock.
+ * @param container Database handle.
+ */
+ short getPriorityClass(ObjectContainer container);
+
+ /**
+ * @return True if when checking the datastore on initial registration,
we
+ * should not promote any blocks found.
+ */
+ public abstract boolean dontCache();
+
+ public long countKeys();
+
+ /**
+ * @return The parent HasKeyListener. This does mean it will be pinned
in
+ * RAM, but it can be deactivated so it's not a big deal.
+ * LOCKING: Should avoid external locking if possible. Will be called
+ * within the CRSBase lock.
+ */
+ public HasKeyListener getHasKeyListener();
+
+ /**
+ * Deactivate the request once it has been removed.
+ */
+ public void onRemove();
+
+ /**
+ * Has the request finished? If every key has been found, or enough
keys have
+ * been found, return true so that the caller can remove it from the
list.
+ */
+ public boolean isEmpty();
+
+}
Added:
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
(rev 0)
+++
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,20 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+
+/**
+ * Thrown when creating a KeyListener fails.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ *
+ */
+class KeyListenerConstructionException extends Exception {
+
+ KeyListenerConstructionException(FetchException e) {
+ super(e);
+ }
+
+ public FetchException getFetchException() {
+ return (FetchException) getCause();
+ }
+
+}
Added: branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,90 @@
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.SendableGet;
+
+public class SingleKeyListener implements KeyListener {
+
+ private final Key key;
+ private final BaseSingleFileFetcher fetcher;
+ private final boolean dontCache;
+ private boolean done;
+ private short prio;
+ private final boolean persistent;
+
+ public SingleKeyListener(Key key, BaseSingleFileFetcher fetcher,
boolean dontCache, short prio, boolean persistent) {
+ this.key = key;
+ this.fetcher = fetcher;
+ this.dontCache = dontCache;
+ this.prio = prio;
+ this.persistent = persistent;
+ }
+
+ public long countKeys() {
+ if(done) return 0;
+ else return 1;
+ }
+
+ public short definitelyWantKey(Key key, byte[] saltedKey,
ObjectContainer container,
+ ClientContext context) {
+ if(!key.equals(this.key)) return -1;
+ else return prio;
+ }
+
+ public boolean dontCache() {
+ return dontCache;
+ }
+
+ public HasKeyListener getHasKeyListener() {
+ return fetcher;
+ }
+
+ public short getPriorityClass(ObjectContainer container) {
+ return prio;
+ }
+
+ public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey,
ObjectContainer container,
+ ClientContext context) {
+ if(!key.equals(this.key)) return null;
+ return new SendableGet[] { fetcher };
+ }
+
+ public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock found,
+ ObjectContainer container, ClientContext context) {
+ if(!key.equals(this.key)) return false;
+ if(persistent)
+ container.activate(fetcher, 1);
+ fetcher.onGotKey(key, found, container, context);
+ if(persistent)
+ container.deactivate(fetcher, 1);
+ synchronized(this) {
+ done = true;
+ }
+ return true;
+ }
+
+ public Key[] listKeys(ObjectContainer container) {
+ return new Key[] { key };
+ }
+
+ public boolean persistent() {
+ return persistent;
+ }
+
+ public boolean probablyWantKey(Key key, byte[] saltedKey) {
+ if(done) return false;
+ return key.equals(this.key);
+ }
+
+ public synchronized void onRemove() {
+ done = true;
+ }
+
+ public boolean isEmpty() {
+ return done;
+ }
+
+}
Added:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
(rev 0)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,308 @@
+package freenet.client.async;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.crypt.SHA256;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.PrioRunnable;
+import freenet.node.SendableGet;
+import freenet.support.BinaryBloomFilter;
+import freenet.support.CountingBloomFilter;
+import freenet.support.Logger;
+import freenet.support.io.NativeThread;
+
+/**
+ * KeyListener implementation for SplitFileFetcher.
+ * Details:
+ * - We have a bloom filter. This is kept in RAM, but stored in a file. It is a
+ * counting filter which is created with the splitfile; when a block is
+ * completed, it is removed from the filter, and we schedule a write after a
+ * certain period of time (we ensure that the write doesn't happen before
that).
+ * Hence even on a fast node, we won't have to write the filter so frequently
+ * as to be a problem. We could use mmap'ed filters, but that might also be a
+ * problem with fd's.
+ * - When a block is actually found, on the database thread, we load the per-
+ * segment bloom filters from the SplitFileFetcher, and thus determine which
+ * segment it belongs to. These are non-counting and static.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ *
+ * LOCKING: Synchronize when changing something, and writing to disk.
+ * Don't need to synchronize on read in most cases, at least for sane
+ * BloomFilter implementations (that is, counting with counting width less than
+ * and divisible into 8).
+ */
+public class SplitFileFetcherKeyListener implements KeyListener {
+
+ private final SplitFileFetcher fetcher;
+ private final boolean persistent;
+ private int keyCount;
+ private final byte[] filterBuffer;
+ private final CountingBloomFilter filter;
+ /** All the segment's bloom filters, stuck together into a single blob
+ * so can be read/written en bloc */
+ private final byte[] segmentsFilterBuffer;
+ private final BinaryBloomFilter[] segmentFilters;
+ /** We store the Bloom filter to this file, but we don't map it, since
we
+ * can't generally afford the fd's. */
+ private final File mainBloomFile;
+ /** Stores Bloom filters for every segment. */
+ private final File altBloomFile;
+ /** Wait for this period for new data to come in before writing the
filter.
+ * The filter is only ever subtracted from, so if we crash we just have
a
+ * few more false positives. On a fast node with slow disk, writing on
every
+ * completed block could become a major bottleneck. */
+ private static final int WRITE_DELAY = 60*1000;
+ private final boolean dontCache;
+ private short prio;
+ /** Used only if we reach the per-segment bloom filters. The overall
bloom
+ * filters use the global salt. */
+ private final byte[] localSalt;
+ private boolean killed;
+
+ /**
+ * Caller must create bloomFile, but it may be empty.
+ * @param newFilter If true, the bloom file is empty, and the bloom
filter
+ * should be created from scratch.
+ * @throws IOException
+ */
+ public SplitFileFetcherKeyListener(SplitFileFetcher parent, int
keyCount, File bloomFile, File altBloomFile, int mainBloomSizeBytes, int
mainBloomK, boolean dontCache, byte[] localSalt, int segments, int
segmentFilterSizeBytes, int segmentBloomK, boolean persistent, boolean
newFilter) throws IOException {
+ fetcher = parent;
+ this.persistent = persistent;
+ this.keyCount = keyCount;
+ this.mainBloomFile = persistent ? new File(bloomFile.getPath())
: null;
+ this.altBloomFile = persistent ? new
File(altBloomFile.getPath()) : null;
+ this.dontCache = dontCache;
+ assert(localSalt.length == 32);
+ if(persistent) {
+ this.localSalt = new byte[32];
+ System.arraycopy(localSalt, 0, this.localSalt, 0, 32);
+ } else {
+ this.localSalt = localSalt;
+ }
+ segmentsFilterBuffer = new byte[segmentFilterSizeBytes *
segments];
+ ByteBuffer baseBuffer = ByteBuffer.wrap(segmentsFilterBuffer);
+ segmentFilters = new BinaryBloomFilter[segments];
+ int start = 0;
+ int end = segmentFilterSizeBytes;
+ for(int i=0;i<segments;i++) {
+ baseBuffer.position(start);
+ baseBuffer.limit(end);
+ ByteBuffer slice = baseBuffer.slice();
+ segmentFilters[i] = new BinaryBloomFilter(slice,
segmentFilterSizeBytes * 8, segmentBloomK);
+ start += segmentFilterSizeBytes;
+ end += segmentFilterSizeBytes;
+ }
+
+ filterBuffer = new byte[mainBloomSizeBytes];
+ if(newFilter) {
+ filter = new CountingBloomFilter(mainBloomSizeBytes * 8
/ 2, mainBloomK, filterBuffer);
+ } else {
+ // Read from file.
+ FileInputStream fis = new FileInputStream(bloomFile);
+ DataInputStream dis = new DataInputStream(fis);
+ dis.readFully(filterBuffer);
+ dis.close();
+ filter = new CountingBloomFilter(mainBloomSizeBytes * 8
/ 2, mainBloomK, filterBuffer);
+ fis = new FileInputStream(altBloomFile);
+ dis = new DataInputStream(fis);
+ dis.readFully(segmentsFilterBuffer);
+ dis.close();
+ }
+ }
+
+ public long countKeys() {
+ return keyCount;
+ }
+
+ /**
+ * SplitFileFetcher adds keys in whatever blocks are convenient.
+ * @param keys
+ */
+ void addKey(Key key, int segNo, ClientContext context) {
+ byte[] saltedKey = context.getChkFetchScheduler().saltKey(key);
+ filter.addKey(saltedKey);
+ segmentFilters[segNo].addKey(localSaltKey(key));
+ }
+
+ private byte[] localSaltKey(Key key) {
+ MessageDigest md = SHA256.getMessageDigest();
+ md.update(key.getRoutingKey());
+ md.update(localSalt);
+ byte[] ret = md.digest();
+ SHA256.returnMessageDigest(md);
+ return ret;
+ }
+
+ public boolean probablyWantKey(Key key, byte[] saltedKey) {
+ return filter.checkFilter(saltedKey);
+ }
+
+ public short definitelyWantKey(Key key, byte[] saltedKey,
ObjectContainer container,
+ ClientContext context) {
+ // Caller has already called probablyWantKey(), so don't do it
again.
+ byte[] salted = localSaltKey(key);
+ for(int i=0;i<segmentFilters.length;i++) {
+ if(segmentFilters[i].checkFilter(salted)) return prio;
+ }
+ return -1;
+ }
+
+ public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock block,
+ ObjectContainer container, ClientContext context) {
+ // Caller has already called probablyWantKey(), so don't do it
again.
+ boolean found = false;
+ byte[] salted = localSaltKey(key);
+ for(int i=0;i<segmentFilters.length;i++) {
+ if(segmentFilters[i].checkFilter(salted)) {
+ if(persistent)
+ container.activate(fetcher, 1);
+ SplitFileFetcherSegment segment =
fetcher.getSegment(i);
+ if(persistent)
+ container.activate(segment, 1);
+ if(segment.onGotKey(key, block, container,
context)) {
+ keyCount--;
+ synchronized(this) {
+ filter.removeKey(saltedKey);
+ }
+ // Update the persistent keyCount.
+ if(persistent)
+ container.activate(fetcher, 1);
+ fetcher.setKeyCount(keyCount,
container);
+ if(persistent)
+ container.deactivate(fetcher,
1);
+ found = true;
+ }
+ if(persistent)
+ container.deactivate(segment, 1);
+ }
+ }
+ return found;
+ }
+
+ public boolean dontCache() {
+ return dontCache;
+ }
+
+ public HasKeyListener getHasKeyListener() {
+ return fetcher;
+ }
+
+ public short getPriorityClass(ObjectContainer container) {
+ return prio;
+ }
+
+ public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey,
+ ObjectContainer container, ClientContext context) {
+ ArrayList<SendableGet> ret = new ArrayList<SendableGet>();
+ // Caller has already called probablyWantKey(), so don't do it
again.
+ byte[] salted = localSaltKey(key);
+ for(int i=0;i<segmentFilters.length;i++) {
+ if(segmentFilters[i].checkFilter(salted)) {
+ if(persistent)
+ container.activate(fetcher, 1);
+ SplitFileFetcherSegment segment =
fetcher.getSegment(i);
+ int blockNum = segment.getBlockNumber(key,
container);
+ if(blockNum >= 0) {
+
ret.add(segment.getSubSegmentFor(blockNum, container));
+ }
+ }
+ }
+ return ret.toArray(new SendableGet[ret.size()]);
+ }
+
+ public void onRemove() {
+ synchronized(this) {
+ killed = true;
+ }
+ if(persistent) {
+ mainBloomFile.delete();
+ altBloomFile.delete();
+ }
+ }
+
+ public boolean persistent() {
+ return persistent;
+ }
+
+ public void writeFilters() throws IOException {
+ if(!persistent) return;
+ RandomAccessFile raf = new RandomAccessFile(mainBloomFile,
"rw");
+ raf.write(filterBuffer);
+ raf.close();
+ raf = new RandomAccessFile(altBloomFile, "rw");
+ raf.write(segmentsFilterBuffer);
+ raf.close();
+ }
+
+ public synchronized int killSegment(SplitFileFetcherSegment segment,
ObjectContainer container, ClientContext context) {
+ int segNo = segment.segNum;
+ segmentFilters[segNo].unsetAll();
+ Key[] removeKeys = segment.listKeys(container);
+ for(int i=0;i<removeKeys.length;i++) {
+ byte[] salted =
context.getChkFetchScheduler().saltKey(removeKeys[i]);
+ if(filter.checkFilter(salted)) {
+ filter.removeKey(salted);
+ } else
+ // Huh??
+ Logger.error(this, "Removing key
"+removeKeys[i]+" from "+segment+" : NOT IN BLOOM FILTER!");
+ }
+ scheduleWriteFilters(context);
+ return keyCount -= removeKeys.length;
+ }
+
+ private boolean writingBloomFilter;
+
+ /** Arrange to write the filters, at some point after this transaction
is
+ * committed. */
+ private void scheduleWriteFilters(ClientContext context) {
+ synchronized(this) {
+ // Worst case, we end up blocking the database thread
while a write completes off thread.
+ // Common case, the write executes on a separate thread.
+ // Don't run the write at too low a priority or we may
get priority inversion.
+ if(writingBloomFilter) return;
+ writingBloomFilter = true;
+ try {
+ context.ticker.queueTimedJob(new PrioRunnable()
{
+
+ public void run() {
+
synchronized(SplitFileFetcherKeyListener.this) {
+ try {
+ writeFilters();
+ } catch (IOException e)
{
+
Logger.error(this, "Failed to write bloom filters, we will have more false
positives on already-found blocks which aren't in the store: "+e, e);
+ } finally {
+
writingBloomFilter = true;
+ }
+ }
+ }
+
+ public int getPriority() {
+ // Don't run the write at too
low a priority or we may get priority inversion.
+ return
NativeThread.HIGH_PRIORITY;
+ }
+
+ }, WRITE_DELAY);
+ } catch (Throwable t) {
+ writingBloomFilter = false;
+ }
+ }
+ }
+
+ public boolean isEmpty() {
+ // FIXME: We rely on SplitFileFetcher unregistering itself.
+ // Maybe we should keep track of how many segments have been
cleared?
+ // We'd have to be sure that they weren't cleared twice...?
+ return killed;
+ }
+
+}