Author: toad
Date: 2008-08-21 12:37:19 +0000 (Thu, 21 Aug 2008)
New Revision: 22070

Added:
   branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
   branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java
   branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java
   branches/db4o/freenet/src/freenet/client/async/KeyListener.java
   
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
   branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java
   
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
Log:
AAAAAAAAAAAARGH! Missing files!


Added: branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java        
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java        
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,385 @@
+package freenet.client.async;
+
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Query;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.keys.NodeSSK;
+import freenet.node.Node;
+import freenet.node.PrioRunnable;
+import freenet.node.RequestStarter;
+import freenet.node.SendableGet;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.io.NativeThread;
+
+/**
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public class DatastoreChecker implements PrioRunnable {
+       
+       static final int MAX_PERSISTENT_KEYS = 1024;
+       
+       /** List of arrays of keys to check for persistent requests. PARTIAL: 
+        * When we run out we will look up some more DatastoreCheckerItem's. */
+       private final ArrayList<Key[]>[] persistentKeys;
+       /** List of persistent requests which we will call finishRegister() for 
+        * when we have checked the keys lists. PARTIAL: When we run out we 
+        * will look up some more DatastoreCheckerItem's. Deactivated. */
+       private final ArrayList<SendableGet>[] persistentGetters;
+       private final ArrayList<Boolean>[] persistentDontCache;
+       private final ArrayList<ClientRequestScheduler>[] persistentSchedulers;
+       private final ArrayList<DatastoreCheckerItem>[] persistentCheckerItems;
+       private final ArrayList<BlockSet>[] persistentBlockSets;
+       
+       /** List of arrays of keys to check for transient requests. */
+       private final ArrayList<Key[]>[] transientKeys;
+       /** List of transient requests which we will call finishRegister() for
+        * when we have checked the keys lists. */
+       private final ArrayList<SendableGet>[] transientGetters;
+       private final ArrayList<BlockSet>[] transientBlockSets;
+       
+       private ClientContext context;
+       private final Node node;
+       
+       public synchronized void setContext(ClientContext context) {
+               this.context = context;
+       }
+
+       public DatastoreChecker(Node node) {
+               this.node = node;
+               int priorities = RequestStarter.NUMBER_OF_PRIORITY_CLASSES;
+               persistentKeys = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       persistentKeys[i] = new ArrayList<Key[]>();
+               persistentGetters = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       persistentGetters[i] = new ArrayList<SendableGet>();
+               persistentDontCache = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       persistentDontCache[i] = new ArrayList<Boolean>();
+               persistentSchedulers = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       persistentSchedulers[i] = new 
ArrayList<ClientRequestScheduler>();
+               persistentCheckerItems = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       persistentCheckerItems[i] = new 
ArrayList<DatastoreCheckerItem>();
+               persistentBlockSets = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       persistentBlockSets[i] = new ArrayList<BlockSet>();
+               transientKeys = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       transientKeys[i] = new ArrayList<Key[]>();
+               transientGetters = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       transientGetters[i] = new ArrayList<SendableGet>();
+               transientBlockSets = new ArrayList[priorities];
+               for(int i=0;i<priorities;i++)
+                       transientBlockSets[i] = new ArrayList<BlockSet>();
+       }
+       
+       private final DBJob loader =  new DBJob() {
+
+               public void run(ObjectContainer container, ClientContext 
context) {
+                       loadPersistentRequests(container, context);
+               }
+               
+       };
+       
+       public void loadPersistentRequests(ObjectContainer container, final 
ClientContext context) {
+               boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               int totalSize = 0;
+               synchronized(this) {
+                       for(int i=0;i<persistentKeys.length;i++) {
+                               for(int j=0;j<persistentKeys[i].size();j++)
+                                       totalSize += 
persistentKeys[i].get(j).length;
+                       }
+                       if(totalSize > MAX_PERSISTENT_KEYS) {
+                               if(logMINOR) Logger.minor(this, "Persistent 
datastore checker queue alreadyfull");
+                               return;
+                       }
+               }
+               for(short p = RequestStarter.MAXIMUM_PRIORITY_CLASS; p < 
RequestStarter.MINIMUM_PRIORITY_CLASS; p++) {
+                       final short prio = p;
+                       Query query = container.query();
+                       query.constrain(DatastoreCheckerItem.class);
+                       
query.descend("nodeDBHandle").constrain(context.nodeDBHandle).
+                               and(query.descend("prio").constrain(prio));
+                               ObjectSet<DatastoreCheckerItem> results = 
query.execute();
+                       for(DatastoreCheckerItem item : results) {
+                               if(item.chosenBy == context.bootID) continue;
+                               SendableGet getter = item.getter;
+                               BlockSet blocks = item.blocks;
+                               container.activate(getter, 1);
+                               boolean dontCache = getter.dontCache();
+                               ClientRequestScheduler sched = 
getter.getScheduler(context);
+                               synchronized(this) {
+                                       
if(persistentGetters[prio].contains(getter)) continue;
+                               }
+                               Key[] keys = getter.listKeys(container);
+                               // FIXME check the store bloom filter using 
store.probablyInStore().
+                               item.chosenBy = context.bootID;
+                               container.set(item);
+                               synchronized(this) {
+                                       
if(persistentGetters[prio].contains(getter)) continue;
+                                       ArrayList<Key> finalKeysToCheck = new 
ArrayList<Key>();
+                                       for(Key key : keys) {
+                                               key = key.cloneKey();
+                                               finalKeysToCheck.add(key);
+                                       }
+                                       Key[] finalKeys =
+                                               finalKeysToCheck.toArray(new 
Key[finalKeysToCheck.size()]);
+                                       persistentKeys[prio].add(finalKeys);
+                                       persistentGetters[prio].add(getter);
+                                       
persistentDontCache[prio].add(dontCache);
+                                       persistentSchedulers[prio].add(sched);
+                                       persistentCheckerItems[prio].add(item);
+                                       persistentBlockSets[prio].add(blocks);
+                                       if(totalSize == 0)
+                                               notifyAll();
+                                       totalSize += finalKeys.length;
+                                       if(totalSize > MAX_PERSISTENT_KEYS) {
+                                               if(trimPersistentQueue(prio, 
container)) return;
+                                               notifyAll();
+                                       }
+                               }
+                               container.deactivate(getter, 1);
+                       }
+               }
+       }
+       
+       /**
+        * Trim the queue of persistent requests until it is just over the 
limit.
+        * @param minPrio Only drop from priorities lower than this one.
+        * @return True unless the queue is under the limit.
+        */
+       private boolean trimPersistentQueue(short prio, ObjectContainer 
container) {
+               synchronized(this) {
+                       int preQueueSize = 0;
+                       for(int i=0;i<prio;i++) {
+                               for(int x=0;x<persistentKeys[i].size();x++)
+                                       preQueueSize += 
persistentKeys[i].get(x).length;
+                       }
+                       if(preQueueSize > MAX_PERSISTENT_KEYS) {
+                               // Dump everything
+                               for(int i=prio+1;i<persistentKeys.length;i++) {
+                                       while(!persistentKeys[i].isEmpty()) {
+                                               int idx = 
persistentKeys[i].size() - 1;
+                                               DatastoreCheckerItem item = 
persistentCheckerItems[i].remove(idx);
+                                               
persistentSchedulers[i].remove(idx);
+                                               
persistentDontCache[i].remove(idx);
+                                               
persistentGetters[i].remove(idx);
+                                               persistentKeys[i].remove(idx);
+                                               
persistentBlockSets[i].remove(idx);
+                                               item.chosenBy = 0;
+                                               container.set(item);
+                                       }
+                               }
+                               return true;
+                       } else {
+                               int postQueueSize = 0;
+                               for(int i=prio+1;i<persistentKeys.length;i++) {
+                                       for(int 
x=0;x<persistentKeys[i].size();x++)
+                                               postQueueSize += 
persistentKeys[i].get(x).length;
+                               }
+                               if(postQueueSize + preQueueSize < 
MAX_PERSISTENT_KEYS)
+                                       return false;
+                               // Need to dump some stuff.
+                               for(int i=persistentKeys.length-1;i>prio;i--) {
+                                       while(!persistentKeys[i].isEmpty()) {
+                                               int idx = 
persistentKeys[i].size() - 1;
+                                               DatastoreCheckerItem item = 
persistentCheckerItems[i].remove(idx);
+                                               
persistentSchedulers[i].remove(idx);
+                                               
persistentDontCache[i].remove(idx);
+                                               
persistentGetters[i].remove(idx);
+                                               Key[] keys = 
persistentKeys[i].remove(idx);
+                                               
persistentBlockSets[i].remove(idx);
+                                               item.chosenBy = 0;
+                                               container.set(item);
+                                               if(postQueueSize + preQueueSize 
- keys.length < MAX_PERSISTENT_KEYS) {
+                                                       return false;
+                                               }
+                                       }
+                               }
+                               // Still over the limit.
+                               return true;
+                       }
+               }
+       }
+       
+       public void queueTransientRequest(SendableGet getter, BlockSet blocks) {
+               Key[] checkKeys = getter.listKeys(null);
+               short prio = getter.getPriorityClass(null);
+               // FIXME check using store.probablyInStore
+               ArrayList<Key> finalKeysToCheck = new ArrayList<Key>();
+               synchronized(this) {
+                       for(Key key : checkKeys) {
+                               finalKeysToCheck.add(key);
+                       }
+                       transientGetters[prio].add(getter);
+                       transientKeys[prio].add(finalKeysToCheck.toArray(new 
Key[finalKeysToCheck.size()]));
+                       transientBlockSets[prio].add(blocks);
+                       notifyAll();
+               }
+       }
+       
+       /**
+        * Queue a persistent request. We will store a DatastoreCheckerItem, 
then 
+        * check the datastore (on the datastore checker thread), and then call 
+        * finishRegister() (on the database thread). Caller must have already 
+        * stored and registered the HasKeyListener if any.
+        * @param getter
+        */
+       public void queuePersistentRequest(SendableGet getter, BlockSet blocks, 
ObjectContainer container) {
+               Key[] checkKeys = getter.listKeys(container);
+               short prio = getter.getPriorityClass(container);
+               boolean dontCache = getter.dontCache();
+               ClientRequestScheduler sched = getter.getScheduler(context);
+               DatastoreCheckerItem item = new DatastoreCheckerItem(getter, 
context.nodeDBHandle, prio, blocks);
+               container.set(item);
+               container.activate(blocks, 5);
+               synchronized(this) {
+                       // FIXME only add if queue not full.
+                       int queueSize = 0;
+                       // Only count queued keys at no higher priority than 
this request.
+                       for(short p = 0;p<=prio;p++) {
+                               for(int x = 0;x<persistentKeys[p].size();x++) {
+                                       queueSize += 
persistentKeys[p].get(x).length;
+                               }
+                       }
+                       if(queueSize > MAX_PERSISTENT_KEYS) return;
+                       item.chosenBy = context.bootID;
+                       container.set(item);
+                       // FIXME check using store.probablyInStore
+                       ArrayList<Key> finalKeysToCheck = new ArrayList<Key>();
+                       for(Key key : checkKeys) {
+                               finalKeysToCheck.add(key);
+                       }
+                       persistentGetters[prio].add(getter);
+                       persistentKeys[prio].add(finalKeysToCheck.toArray(new 
Key[finalKeysToCheck.size()]));
+                       persistentDontCache[prio].add(dontCache);
+                       persistentSchedulers[prio].add(sched);
+                       persistentCheckerItems[prio].add(item);
+                       persistentBlockSets[prio].add(blocks);
+                       trimPersistentQueue(prio, container);
+                       notifyAll();
+               }
+       }
+
+       public void run() {
+               while(true) {
+                       try {
+                               realRun();
+                       } catch (Throwable t) {
+                               Logger.error(this, "Caught "+t+" in datastore 
checker thread", t);
+                       }
+               }
+       }
+
+       private void realRun() {
+               boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               Key[] keys = null;
+               SendableGet getter = null;
+               boolean persistent = false;
+               boolean dontCache = false;
+               ClientRequestScheduler sched = null;
+               DatastoreCheckerItem item = null;
+               BlockSet blocks = null;
+               short priority = -1;
+               synchronized(this) {
+                       while(true) {
+                               for(short prio = 
0;prio<transientKeys.length;prio++) {
+                                       if(!transientKeys[prio].isEmpty()) {
+                                               keys = 
transientKeys[prio].remove(0);
+                                               getter = 
transientGetters[prio].remove(0);
+                                               persistent = false;
+                                               item = null;
+                                               blocks = 
transientBlockSets[prio].remove(0);
+                                               priority = prio;
+                                               break;
+                                       } else 
if(!persistentGetters[prio].isEmpty()) {
+                                               keys = 
persistentKeys[prio].remove(0);
+                                               getter = 
persistentGetters[prio].remove(0);
+                                               persistent = true;
+                                               dontCache = 
persistentDontCache[prio].remove(0);
+                                               sched = 
persistentSchedulers[prio].remove(0);
+                                               item = 
persistentCheckerItems[prio].remove(0);
+                                               blocks = 
persistentBlockSets[prio].remove(0);
+                                               priority = prio;
+                                               break;
+                                       }
+                               }
+                               if(keys == null) {
+                                       try {
+                                               wait(100*1000);
+                                       } catch (InterruptedException e) {
+                                               // Ok
+                                       }
+                                       context.jobRunner.queue(loader, 
NativeThread.HIGH_PRIORITY, true);
+                                       continue;
+                               }
+                               break;
+                       }
+               }
+               if(!persistent) {
+                       dontCache = getter.dontCache();
+                       sched = getter.getScheduler(context);
+               }
+               boolean anyValid = false;
+               for(Key key : keys) {
+                       KeyBlock block = null;
+                       if(blocks != null)
+                               block = blocks.get(key);
+                       if(blocks == null)
+                               block = node.fetch(key, dontCache);
+                       if(block != null) {
+                               if(logMINOR) Logger.minor(this, "Found key");
+                               if(key instanceof NodeSSK)
+                                       sched.tripPendingKey(block);
+                               else // CHK
+                                       sched.tripPendingKey(block);
+                       } else {
+                               anyValid = true;
+                       }
+//                     synchronized(this) {
+//                             keysToCheck[priority].remove(key);
+//                     }
+               }
+               if(persistent)
+                       context.jobRunner.queue(loader, 
NativeThread.HIGH_PRIORITY, true);
+               if(persistent) {
+                       final SendableGet get = getter;
+                       final ClientRequestScheduler scheduler = sched;
+                       final boolean valid = anyValid;
+                       final DatastoreCheckerItem it = item;
+                       context.jobRunner.queue(new DBJob() {
+
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       scheduler.finishRegister(new 
SendableGet[] { get }, true, true, valid, it);
+                                       loader.run(container, context);
+                               }
+                               
+                       }, NativeThread.NORM_PRIORITY, false);
+               } else {
+                       sched.finishRegister(new SendableGet[] { getter }, 
false, false, anyValid, item);
+               }
+       }
+       
+       synchronized void wakeUp() {
+               notifyAll();
+       }
+
+       public void start(Executor executor, String name) {
+               context.jobRunner.queue(loader, NativeThread.HIGH_PRIORITY-1, 
true);
+               executor.execute(this, name);
+       }
+
+       public int getPriority() {
+               return NativeThread.NORM_PRIORITY;
+       }
+       
+}

Added: branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java    
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DatastoreCheckerItem.java    
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,26 @@
+package freenet.client.async;
+
+import freenet.node.SendableGet;
+
+/**
+ * Persistent tag for a persistent request which needs to check the datastore
+ * and then be registered.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ *
+ */
+public class DatastoreCheckerItem {
+       
+       final long nodeDBHandle;
+       final SendableGet getter;
+       final short prio;
+       long chosenBy;
+       final BlockSet blocks;
+       
+       DatastoreCheckerItem(SendableGet getter, long nodeDBHandle, short prio, 
BlockSet blocks) {
+               this.getter = getter;
+               this.nodeDBHandle = nodeDBHandle;
+               this.prio = prio;
+               this.blocks = blocks;
+       }
+
+}

Added: branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java          
                (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/HasKeyListener.java  
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,30 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import com.db4o.ObjectContainer;
+
+/**
+ * Interface to show that we can create a KeyListener callback.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public interface HasKeyListener {
+       
+       /**
+        * Create a KeyListener, a transient object used to determine which 
keys we
+        * want, and to handle any blocks found.
+        * @return Null if the HasKeyListener is finished/cancelled/etc.
+        * @throws IOException 
+        */
+       KeyListener makeKeyListener(ObjectContainer container, ClientContext 
context) throws KeyListenerConstructionException;
+
+       /**
+        * Is it cancelled?
+        */
+       boolean isCancelled(ObjectContainer container);
+
+       /**
+        * Notify that makeKeyListener() failed.
+        */
+       void onFailed(KeyListenerConstructionException e, ObjectContainer 
container, ClientContext context);
+}

Added: branches/db4o/freenet/src/freenet/client/async/KeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/KeyListener.java             
                (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/KeyListener.java     
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,93 @@
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.SendableGet;
+
+/**
+ * Transient object created on startup for persistent requests (or at creation
+ * time for non-persistent requests), to monitor the stream of successfully
+ * fetched keys. If a key appears interesting, we schedule a job on the 
database
+ * thread to double-check and process the data if we still want it.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ * 
+ * saltedKey is the routing key from the key, salted globally (concat a global
+ * salt value and then SHA) in order to save some cycles. Implementations that
+ * use two internal bloom filters may need to have an additional local salt, as
+ * in SplitFileFetcherKeyListener.
+ */
+public interface KeyListener {
+       
+       /**
+        * Fast guess at whether we want a key or not. Usually implemented by a 
+        * bloom filter.
+        * LOCKING: Should avoid external locking if possible. Will be called
+        * within the CRSBase lock.
+        * @return True if we probably want the key. False if we definitely 
don't
+        * want it.
+        */
+       public boolean probablyWantKey(Key key, byte[] saltedKey);
+       
+       /**
+        * Do we want the key? This is called by the ULPR code, because 
fetching the
+        * key will involve significant work. tripPendingKey() on the other hand
+        * will go straight to handleBlock().
+        * @return -1 if we don't want the key, otherwise the priority of the 
request
+        * interested in the key.
+        */
+       public short definitelyWantKey(Key key, byte[] saltedKey, 
ObjectContainer container, ClientContext context);
+
+       /**
+        * Find the requests related to a specific key, used in retrying after 
cooldown.
+        * Caller should call probablyWantKey() first.
+        */
+       public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey, 
ObjectContainer container, ClientContext context);
+       
+       /**
+        * Handle the found data, if we really want it.
+        */
+       public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock found, 
ObjectContainer container, ClientContext context);
+       
+       /**
+        * Is this related to a persistent request?
+        */
+       boolean persistent();
+
+       /**
+        * Priority of the associated request.
+        * LOCKING: Should avoid external locking if possible. Will be called
+        * within the CRSBase lock.
+        * @param container Database handle.
+        */
+       short getPriorityClass(ObjectContainer container);
+
+       /**
+        * @return True if when checking the datastore on initial registration, 
we
+        * should not promote any blocks found.
+        */
+       public abstract boolean dontCache();
+
+       public long countKeys();
+
+       /**
+        * @return The parent HasKeyListener. This does mean it will be pinned 
in
+        * RAM, but it can be deactivated so it's not a big deal.
+        * LOCKING: Should avoid external locking if possible. Will be called
+        * within the CRSBase lock.
+        */
+       public HasKeyListener getHasKeyListener();
+
+       /**
+        * Deactivate the request once it has been removed.
+        */
+       public void onRemove();
+       
+       /**
+        * Has the request finished? If every key has been found, or enough 
keys have
+        * been found, return true so that the caller can remove it from the 
list.
+        */
+       public boolean isEmpty();
+
+}

Added: 
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
                                (rev 0)
+++ 
branches/db4o/freenet/src/freenet/client/async/KeyListenerConstructionException.java
        2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,20 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+
+/**
+ * Thrown when creating a KeyListener fails.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ *
+ */
+class KeyListenerConstructionException extends Exception {
+
+       KeyListenerConstructionException(FetchException e) {
+               super(e);
+       }
+       
+       public FetchException getFetchException() {
+               return (FetchException) getCause();
+       }
+       
+}

Added: branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java       
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/SingleKeyListener.java       
2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,90 @@
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.SendableGet;
+
+public class SingleKeyListener implements KeyListener {
+       
+       private final Key key;
+       private final BaseSingleFileFetcher fetcher;
+       private final boolean dontCache;
+       private boolean done;
+       private short prio;
+       private final boolean persistent;
+
+       public SingleKeyListener(Key key, BaseSingleFileFetcher fetcher, 
boolean dontCache, short prio, boolean persistent) {
+               this.key = key;
+               this.fetcher = fetcher;
+               this.dontCache = dontCache;
+               this.prio = prio;
+               this.persistent = persistent;
+       }
+
+       public long countKeys() {
+               if(done) return 0;
+               else return 1;
+       }
+
+       public short definitelyWantKey(Key key, byte[] saltedKey, 
ObjectContainer container,
+                       ClientContext context) {
+               if(!key.equals(this.key)) return -1;
+               else return prio;
+       }
+
+       public boolean dontCache() {
+               return dontCache;
+       }
+
+       public HasKeyListener getHasKeyListener() {
+               return fetcher;
+       }
+
+       public short getPriorityClass(ObjectContainer container) {
+               return prio;
+       }
+
+       public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey, 
ObjectContainer container,
+                       ClientContext context) {
+               if(!key.equals(this.key)) return null;
+               return new SendableGet[] { fetcher };
+       }
+
+       public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock found,
+                       ObjectContainer container, ClientContext context) {
+               if(!key.equals(this.key)) return false;
+               if(persistent)
+                       container.activate(fetcher, 1);
+               fetcher.onGotKey(key, found, container, context);
+               if(persistent)
+                       container.deactivate(fetcher, 1);
+               synchronized(this) {
+                       done = true;
+               }
+               return true;
+       }
+
+       public Key[] listKeys(ObjectContainer container) {
+               return new Key[] { key };
+       }
+
+       public boolean persistent() {
+               return persistent;
+       }
+
+       public boolean probablyWantKey(Key key, byte[] saltedKey) {
+               if(done) return false;
+               return key.equals(this.key);
+       }
+
+       public synchronized void onRemove() {
+               done = true;
+       }
+
+       public boolean isEmpty() {
+               return done;
+       }
+
+}

Added: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java 
                            (rev 0)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java 
    2008-08-21 12:37:19 UTC (rev 22070)
@@ -0,0 +1,308 @@
+package freenet.client.async;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+
+import com.db4o.ObjectContainer;
+
+import freenet.crypt.SHA256;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.PrioRunnable;
+import freenet.node.SendableGet;
+import freenet.support.BinaryBloomFilter;
+import freenet.support.CountingBloomFilter;
+import freenet.support.Logger;
+import freenet.support.io.NativeThread;
+
+/**
+ * KeyListener implementation for SplitFileFetcher.
+ * Details:
+ * - We have a bloom filter. This is kept in RAM, but stored in a file. It is a
+ * counting filter which is created with the splitfile; when a block is 
+ * completed, it is removed from the filter, and we schedule a write after a
+ * certain period of time (we ensure that the write doesn't happen before 
that).
+ * Hence even on a fast node, we won't have to write the filter so frequently
+ * as to be a problem. We could use mmap'ed filters, but that might also be a
+ * problem with fd's.
+ * - When a block is actually found, on the database thread, we load the per-
+ * segment bloom filters from the SplitFileFetcher, and thus determine which 
+ * segment it belongs to. These are non-counting and static.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ * 
+ * LOCKING: Synchronize when changing something, and writing to disk. 
+ * Don't need to synchronize on read in most cases, at least for sane 
+ * BloomFilter implementations (that is, counting with counting width less than
+ * and divisible into 8).
+ */
+public class SplitFileFetcherKeyListener implements KeyListener {
+       
+       private final SplitFileFetcher fetcher;
+       private final boolean persistent;
+       private int keyCount;
+       private final byte[] filterBuffer;
+       private final CountingBloomFilter filter;
+       /** All the segment's bloom filters, stuck together into a single blob
+        * so can be read/written en bloc */
+       private final byte[] segmentsFilterBuffer;
+       private final BinaryBloomFilter[] segmentFilters;
+       /** We store the Bloom filter to this file, but we don't map it, since 
we
+        * can't generally afford the fd's. */
+       private final File mainBloomFile;
+       /** Stores Bloom filters for every segment. */
+       private final File altBloomFile;
+       /** Wait for this period for new data to come in before writing the 
filter.
+        * The filter is only ever subtracted from, so if we crash we just have 
a
+        * few more false positives. On a fast node with slow disk, writing on 
every 
+        * completed block could become a major bottleneck. */
+       private static final int WRITE_DELAY = 60*1000;
+       private final boolean dontCache;
+       private short prio;
+       /** Used only if we reach the per-segment bloom filters. The overall 
bloom
+        * filters use the global salt. */
+       private final byte[] localSalt;
+       private boolean killed;
+
+       /**
+        * Caller must create bloomFile, but it may be empty.
+        * @param newFilter If true, the bloom file is empty, and the bloom 
filter
+        * should be created from scratch.
+        * @throws IOException 
+        */
+       public SplitFileFetcherKeyListener(SplitFileFetcher parent, int 
keyCount, File bloomFile, File altBloomFile, int mainBloomSizeBytes, int 
mainBloomK, boolean dontCache, byte[] localSalt, int segments, int 
segmentFilterSizeBytes, int segmentBloomK, boolean persistent, boolean 
newFilter) throws IOException {
+               fetcher = parent;
+               this.persistent = persistent;
+               this.keyCount = keyCount;
+               this.mainBloomFile = persistent ? new File(bloomFile.getPath()) 
: null;
+               this.altBloomFile = persistent ? new 
File(altBloomFile.getPath()) : null;
+               this.dontCache = dontCache;
+               assert(localSalt.length == 32);
+               if(persistent) {
+                       this.localSalt = new byte[32];
+                       System.arraycopy(localSalt, 0, this.localSalt, 0, 32);
+               } else {
+                       this.localSalt = localSalt;
+               }
+               segmentsFilterBuffer = new byte[segmentFilterSizeBytes * 
segments];
+               ByteBuffer baseBuffer = ByteBuffer.wrap(segmentsFilterBuffer);
+               segmentFilters = new BinaryBloomFilter[segments];
+               int start = 0;
+               int end = segmentFilterSizeBytes;
+               for(int i=0;i<segments;i++) {
+                       baseBuffer.position(start);
+                       baseBuffer.limit(end);
+                       ByteBuffer slice = baseBuffer.slice();
+                       segmentFilters[i] = new BinaryBloomFilter(slice, 
segmentFilterSizeBytes * 8, segmentBloomK);
+                       start += segmentFilterSizeBytes;
+                       end += segmentFilterSizeBytes;
+               }
+               
+               filterBuffer = new byte[mainBloomSizeBytes];
+               if(newFilter) {
+                       filter = new CountingBloomFilter(mainBloomSizeBytes * 8 
/ 2, mainBloomK, filterBuffer);
+               } else {
+                       // Read from file.
+                       FileInputStream fis = new FileInputStream(bloomFile);
+                       DataInputStream dis = new DataInputStream(fis);
+                       dis.readFully(filterBuffer);
+                       dis.close();
+                       filter = new CountingBloomFilter(mainBloomSizeBytes * 8 
/ 2, mainBloomK, filterBuffer);
+                       fis = new FileInputStream(altBloomFile);
+                       dis = new DataInputStream(fis);
+                       dis.readFully(segmentsFilterBuffer);
+                       dis.close();
+               }
+       }
+
+       public long countKeys() {
+               return keyCount;
+       }
+       
+       /**
+        * SplitFileFetcher adds keys in whatever blocks are convenient.
+        * @param keys
+        */
+       void addKey(Key key, int segNo, ClientContext context) {
+               byte[] saltedKey = context.getChkFetchScheduler().saltKey(key);
+               filter.addKey(saltedKey);
+               segmentFilters[segNo].addKey(localSaltKey(key));
+       }
+
+       private byte[] localSaltKey(Key key) {
+               MessageDigest md = SHA256.getMessageDigest();
+               md.update(key.getRoutingKey());
+               md.update(localSalt);
+               byte[] ret = md.digest();
+               SHA256.returnMessageDigest(md);
+               return ret;
+       }
+
+       public boolean probablyWantKey(Key key, byte[] saltedKey) {
+               return filter.checkFilter(saltedKey);
+       }
+
+       public short definitelyWantKey(Key key, byte[] saltedKey, 
ObjectContainer container,
+                       ClientContext context) {
+               // Caller has already called probablyWantKey(), so don't do it 
again.
+               byte[] salted = localSaltKey(key);
+               for(int i=0;i<segmentFilters.length;i++) {
+                       if(segmentFilters[i].checkFilter(salted)) return prio;
+               }
+               return -1;
+       }
+       
+       public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock block,
+                       ObjectContainer container, ClientContext context) {
+               // Caller has already called probablyWantKey(), so don't do it 
again.
+               boolean found = false;
+               byte[] salted = localSaltKey(key);
+               for(int i=0;i<segmentFilters.length;i++) {
+                       if(segmentFilters[i].checkFilter(salted)) {
+                               if(persistent)
+                                       container.activate(fetcher, 1);
+                               SplitFileFetcherSegment segment = 
fetcher.getSegment(i);
+                               if(persistent)
+                                       container.activate(segment, 1);
+                               if(segment.onGotKey(key, block, container, 
context)) {
+                                       keyCount--;
+                                       synchronized(this) {
+                                               filter.removeKey(saltedKey);
+                                       }
+                                       // Update the persistent keyCount.
+                                       if(persistent)
+                                               container.activate(fetcher, 1);
+                                       fetcher.setKeyCount(keyCount, 
container);
+                                       if(persistent)
+                                               container.deactivate(fetcher, 
1);
+                                       found = true;
+                               }
+                               if(persistent)
+                                       container.deactivate(segment, 1);
+                       }
+               }
+               return found;
+       }
+
+       public boolean dontCache() {
+               return dontCache;
+       }
+
+       public HasKeyListener getHasKeyListener() {
+               return fetcher;
+       }
+
+       public short getPriorityClass(ObjectContainer container) {
+               return prio;
+       }
+
+       public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey, 
+                       ObjectContainer container, ClientContext context) {
+               ArrayList<SendableGet> ret = new ArrayList<SendableGet>();
+               // Caller has already called probablyWantKey(), so don't do it 
again.
+               byte[] salted = localSaltKey(key);
+               for(int i=0;i<segmentFilters.length;i++) {
+                       if(segmentFilters[i].checkFilter(salted)) {
+                               if(persistent)
+                                       container.activate(fetcher, 1);
+                               SplitFileFetcherSegment segment = 
fetcher.getSegment(i);
+                               int blockNum = segment.getBlockNumber(key, 
container);
+                               if(blockNum >= 0) {
+                                       
ret.add(segment.getSubSegmentFor(blockNum, container));
+                               }
+                       }
+               }
+               return ret.toArray(new SendableGet[ret.size()]);
+       }
+
+       public void onRemove() {
+               synchronized(this) {
+                       killed = true;
+               }
+               if(persistent) {
+                       mainBloomFile.delete();
+                       altBloomFile.delete();
+               }
+       }
+
+       public boolean persistent() {
+               return persistent;
+       }
+
+       public void writeFilters() throws IOException {
+               if(!persistent) return;
+               RandomAccessFile raf = new RandomAccessFile(mainBloomFile, 
"rw");
+               raf.write(filterBuffer);
+               raf.close();
+               raf = new RandomAccessFile(altBloomFile, "rw");
+               raf.write(segmentsFilterBuffer);
+               raf.close();
+       }
+
+       public synchronized int killSegment(SplitFileFetcherSegment segment, 
ObjectContainer container, ClientContext context) {
+               int segNo = segment.segNum;
+               segmentFilters[segNo].unsetAll();
+               Key[] removeKeys = segment.listKeys(container);
+               for(int i=0;i<removeKeys.length;i++) {
+                       byte[] salted = 
context.getChkFetchScheduler().saltKey(removeKeys[i]);
+                       if(filter.checkFilter(salted)) {
+                               filter.removeKey(salted);
+                       } else
+                               // Huh??
+                               Logger.error(this, "Removing key 
"+removeKeys[i]+" from "+segment+" : NOT IN BLOOM FILTER!");
+               }
+               scheduleWriteFilters(context);
+               return keyCount -= removeKeys.length;
+       }
+
+       private boolean writingBloomFilter;
+       
+       /** Arrange to write the filters, at some point after this transaction 
is
+        * committed. */
+       private void scheduleWriteFilters(ClientContext context) {
+               synchronized(this) {
+                       // Worst case, we end up blocking the database thread 
while a write completes off thread.
+                       // Common case, the write executes on a separate thread.
+                       // Don't run the write at too low a priority or we may 
get priority inversion.
+                       if(writingBloomFilter) return;
+                       writingBloomFilter = true;
+                       try {
+                               context.ticker.queueTimedJob(new PrioRunnable() 
{
+
+                                       public void run() {
+                                               
synchronized(SplitFileFetcherKeyListener.this) {
+                                                       try {
+                                                               writeFilters();
+                                                       } catch (IOException e) 
{
+                                                               
Logger.error(this, "Failed to write bloom filters, we will have more false 
positives on already-found blocks which aren't in the store: "+e, e);
+                                                       } finally {
+                                                               
writingBloomFilter = true;
+                                                       }
+                                               }
+                                       }
+
+                                       public int getPriority() {
+                                               // Don't run the write at too 
low a priority or we may get priority inversion.
+                                               return 
NativeThread.HIGH_PRIORITY;
+                                       }
+                                       
+                               }, WRITE_DELAY);
+                       } catch (Throwable t) {
+                               writingBloomFilter = false;
+                       }
+               }
+       }
+
+       public boolean isEmpty() {
+               // FIXME: We rely on SplitFileFetcher unregistering itself.
+               // Maybe we should keep track of how many segments have been 
cleared?
+               // We'd have to be sure that they weren't cleared twice...?
+               return killed;
+       }
+
+}


Reply via email to