Author: toad
Date: 2009-04-18 23:22:39 +0000 (Sat, 18 Apr 2009)
New Revision: 27023

Modified:
   trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
   trunk/freenet/src/freenet/client/async/USKFetcher.java
   trunk/freenet/src/freenet/client/async/USKManager.java
Log:
Check the next 50 datastore slots before starting actual requests.


Modified: trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2009-04-18 20:17:47 UTC (rev 27022)
+++ trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2009-04-18 23:22:39 UTC (rev 27023)
@@ -396,6 +396,7 @@
                                        for(int i=0;i<getters.length;i++) {
                                                SendableGet getter = getters[i];
                                                container.activate(getter, 1);
+                                               getter.preRegister(container, 
clientContext);
                                                
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
                                                        wereAnyValid = true;
                                                        
schedCore.innerRegister(getter, random, container, getters);
@@ -422,6 +423,7 @@
                                                        
if(container.ext().isActive(getter))
                                                                
Logger.error(this, "ALREADY ACTIVE in delayed finishRegister: "+getter);
                                                        
container.activate(getter, 1);
+                                                       
getter.preRegister(container, clientContext);
                                                        
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
                                                                wereAnyValid = 
true;
                                                                
schedCore.innerRegister(getter, random, container, getters);
@@ -442,8 +444,11 @@
                } else {
                        if(!anyValid) return;
                        // Register immediately.
-                       for(int i=0;i<getters.length;i++)
+                       for(int i=0;i<getters.length;i++) {
+                               getters[i].preRegister(container, 
clientContext);
+                               if(getters[i].isCancelled(null) || 
getters[i].isEmpty(null)) continue;
                                schedTransient.innerRegister(getters[i], 
random, null, getters);
+                       }
                        starter.wakeUp();
                }
        }

Modified: trunk/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcher.java      2009-04-18 
20:17:47 UTC (rev 27022)
+++ trunk/freenet/src/freenet/client/async/USKFetcher.java      2009-04-18 
23:22:39 UTC (rev 27023)
@@ -8,11 +8,13 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Vector;
 
 import com.db4o.ObjectContainer;
 
 import freenet.client.FetchContext;
+import freenet.keys.ClientKey;
 import freenet.keys.ClientSSK;
 import freenet.keys.ClientSSKBlock;
 import freenet.keys.FreenetURI;
@@ -21,8 +23,13 @@
 import freenet.keys.KeyDecodeException;
 import freenet.keys.SSKBlock;
 import freenet.keys.USK;
+import freenet.node.KeysFetchingLocally;
+import freenet.node.LowLevelGetException;
+import freenet.node.RequestClient;
+import freenet.node.RequestScheduler;
 import freenet.node.RequestStarter;
 import freenet.node.SendableGet;
+import freenet.node.SendableRequestItem;
 import freenet.support.LogThresholdCallback;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
@@ -118,8 +125,10 @@
        /** A list of keys which we are interested in. This a sequence of SSKs 
starting 
         * at the last known slot. */
        private final ArrayList<ClientSSK> keysWatching;
+       private long checkedDatastoreUpTo;
+       private final ArrayList<USKAttempt> attemptsToStart;
        
-       private static final int WATCH_KEYS = 500;
+       private static final int WATCH_KEYS = 50;
        
        /**
         * Callbacks are told when the USKFetcher finishes, and unless 
background poll is
@@ -269,6 +278,7 @@
                this.backgroundPoll = pollForever;
                this.keepLastData = keepLastData;
                keysWatching = new ArrayList<ClientSSK>();
+               attemptsToStart = new ArrayList<USKAttempt>();
        }
        
        void onDNF(USKAttempt att, ClientContext context) {
@@ -354,7 +364,6 @@
        }
 
        void onSuccess(USKAttempt att, boolean dontUpdate, ClientSSKBlock 
block, final ClientContext context) {
-               LinkedList<USKAttempt> l = null;
                final long lastEd = uskManager.lookupLatestSlot(origUSK);
                long curLatest;
                boolean decode = false;
@@ -364,19 +373,18 @@
                        if(completed || cancelled) return;
                        decode = curLatest >= lastEd && !(dontUpdate && block 
== null);
                        curLatest = Math.max(lastEd, curLatest);
-                       fillKeysWatching(curLatest);
                        if(logMINOR) Logger.minor(this, "Latest: "+curLatest);
                        long addTo = curLatest + minFailures;
                        long addFrom = Math.max(lastAddedEdition + 1, curLatest 
+ 1);
                        if(logMINOR) Logger.minor(this, "Adding from 
"+addFrom+" to "+addTo+" for "+origUSK);
                        if(addTo >= addFrom) {
-                               l = new LinkedList<USKAttempt>();
                                for(long i=addFrom;i<=addTo;i++) {
                                        if(logMINOR) Logger.minor(this, "Adding 
checker for edition "+i+" for "+origUSK);
-                                       l.add(add(i));
+                                       attemptsToStart.add(add(i));
                                }
                        }
                        cancelBefore(curLatest, context);
+                       fillKeysWatching(curLatest+1, context);
                }
                Bucket data = null;
                if(decode) {
@@ -403,29 +411,6 @@
                }
                if(!dontUpdate)
                        uskManager.updateSlot(origUSK, curLatest, context);
-               if(l == null) return;
-               final LinkedList<USKAttempt> toSched = l;
-               // If we schedule them here, we don't get icky recursion 
problems.
-               if(!cancelled) {
-                       context.mainExecutor.execute(new Runnable() {
-                               public void run() {
-                                       long last = lastEd;
-                                       for(Iterator<USKAttempt> 
i=toSched.iterator();i.hasNext();) {
-                                               // We may be called recursively 
through onSuccess().
-                                               // So don't start obsolete 
requests.
-                                               USKAttempt a = i.next();
-                                               last = 
uskManager.lookupLatestSlot(origUSK);
-                                               if((last <= a.number) && 
!a.cancelled)
-                                                       a.schedule(null, 
context);
-                                               else {
-                                                       synchronized(this) {
-                                                               
runningAttempts.remove(a);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }, "USK scheduler"); // Run on separate thread because 
otherwise can get loooong recursions
-               }
        }
 
        void onCancelled(USKAttempt att, ClientContext context) {
@@ -534,29 +519,13 @@
                long lookedUp = uskManager.lookupLatestSlot(origUSK);
                synchronized(this) {
                        valueAtSchedule = Math.max(lookedUp, valueAtSchedule);
-                       fillKeysWatching(valueAtSchedule);
                        if(cancelled) return;
                        long startPoint = Math.max(origUSK.suggestedEdition, 
valueAtSchedule);
                        for(long i=startPoint;i<startPoint+minFailures;i++)
-                               add(i);
-                       attempts = runningAttempts.toArray(new 
USKAttempt[runningAttempts.size()]);
+                               attemptsToStart.add(add(i));
                        started = true;
+                       fillKeysWatching(valueAtSchedule, context);
                }
-               if(!cancelled) {
-                       for(int i=0;i<attempts.length;i++) {
-                               // Race conditions happen here and waste a lot 
more time than this simple check.
-                               long lastEd = 
uskManager.lookupLatestSlot(origUSK);
-                               if(keepLastData && lastEd == lookedUp)
-                                       lastEd--; // If we want the data, then 
get it for the known edition, so we always get the data, so USKInserter can 
compare it and return the old edition if it is identical.
-                               if(attempts[i].number > lastEd)
-                                       attempts[i].schedule(container, 
context);
-                               else {
-                                       synchronized(this) {
-                                               
runningAttempts.remove(attempts[i]);
-                                       }
-                               }
-                       }
-               }
        }
 
        public void cancel(ObjectContainer container, ClientContext context) {
@@ -695,26 +664,24 @@
 
        public void onFoundEdition(long ed, USK key, ObjectContainer container, 
final ClientContext context, boolean metadata, short codec, byte[] data, 
boolean newKnownGood, boolean newSlotToo) {
                if(newKnownGood && !newSlotToo) return; // Only interested in 
slots
-               LinkedList<USKAttempt> l = null;
                final long lastEd = uskManager.lookupLatestSlot(origUSK);
                boolean decode = false;
                synchronized(this) {
                        if(completed || cancelled) return;
                        decode = lastEd >= ed && data != null;
                        ed = Math.max(lastEd, ed);
-                       fillKeysWatching(ed);
                        if(logMINOR) Logger.minor(this, "Latest: "+ed);
                        long addTo = ed + minFailures;
                        long addFrom = Math.max(lastAddedEdition + 1, ed + 1);
                        if(logMINOR) Logger.minor(this, "Adding from 
"+addFrom+" to "+addTo+" for "+origUSK);
                        if(addTo >= addFrom) {
-                               l = new LinkedList<USKAttempt>();
                                for(long i=addFrom;i<=addTo;i++) {
                                        if(logMINOR) Logger.minor(this, "Adding 
checker for edition "+i+" for "+origUSK);
-                                       l.add(add(i));
+                                       attemptsToStart.add(add(i));
                                }
                        }
                        cancelBefore(ed, context);
+                       fillKeysWatching(ed+1, context);
                }
                synchronized(this) {
                        if (decode) {
@@ -732,32 +699,12 @@
                                        }
                        }
                }
-               if(l == null) return;
-               final LinkedList<USKAttempt> toSched = l;
-               // If we schedule them here, we don't get icky recursion 
problems.
-               if(!cancelled) {
-                       context.mainExecutor.execute(new Runnable() {
-                               public void run() {
-                                       long last = lastEd;
-                                       for(Iterator<USKAttempt> 
i=toSched.iterator();i.hasNext();) {
-                                               // We may be called recursively 
through onSuccess().
-                                               // So don't start obsolete 
requests.
-                                               USKAttempt a = i.next();
-                                               last = 
uskManager.lookupLatestSlot(origUSK);
-                                               if((last <= a.number) && 
!a.cancelled)
-                                                       a.schedule(null, 
context);
-                                               else {
-                                                       synchronized(this) {
-                                                               
runningAttempts.remove(a);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }, "USK scheduler"); // Run on separate thread because 
otherwise can get loooong recursions
-               }
        }
 
-       private synchronized void fillKeysWatching(long ed) {
+       private boolean runningStoreChecker = false;
+       
+       private synchronized void fillKeysWatching(long ed, ClientContext 
context) {
+               if(logMINOR) Logger.minor(this, "fillKeysWatching from "+ed+" 
for "+this+" : "+origUSK, new Exception("debug"));
 //             if(firstKeyWatching == -1) {
 //                     firstKeyWatching = ed;
 //                     for(int i=0;i<WATCH_KEYS;i++) {
@@ -785,6 +732,171 @@
 //                     }
                        firstKeyWatching = ed;
 //             }
+               if(runningStoreChecker) return;
+               long firstCheck = Math.max(firstKeyWatching, 
checkedDatastoreUpTo);
+               final long lastCheck = firstKeyWatching + keysWatching.size() - 
1;
+               if(lastCheck < firstCheck) return;
+               int checkCount = (int) (lastCheck - firstCheck + 1);
+               int offset = (int) (firstCheck - firstKeyWatching);
+               final Key[] checkStore = new Key[checkCount];
+               for(int i=0;i<checkStore.length;i++) {
+                       checkStore[i] = keysWatching.get(i+offset).getNodeKey();
+               }
+               if(logMINOR) Logger.minor(this, "Checking from "+firstCheck+" 
to "+lastCheck+" for "+this+" : "+origUSK);
+               SendableGet storeChecker = new SendableGet(parent) {
+                       
+                       boolean done = false;
+
+                       @Override
+                       public boolean dontCache(ObjectContainer container) {
+                               return false;
+                       }
+
+                       @Override
+                       public FetchContext getContext() {
+                               return this.getContext();
+                       }
+
+                       @Override
+                       public long getCooldownWakeup(Object token, 
ObjectContainer container) {
+                               return -1;
+                       }
+
+                       @Override
+                       public long getCooldownWakeupByKey(Key key, 
ObjectContainer container) {
+                               return -1;
+                       }
+
+                       @Override
+                       public ClientKey getKey(Object token, ObjectContainer 
container) {
+                               return null;
+                       }
+
+                       @Override
+                       public boolean ignoreStore() {
+                               return false;
+                       }
+
+                       @Override
+                       public Key[] listKeys(ObjectContainer container) {
+                               return checkStore;
+                       }
+
+                       @Override
+                       public void onFailure(LowLevelGetException e, Object 
token, ObjectContainer container, ClientContext context) {
+                               // Ignore
+                       }
+
+                       @Override
+                       public void requeueAfterCooldown(Key key, long time, 
ObjectContainer container, ClientContext context) {
+                               // Ignore
+                       }
+
+                       @Override
+                       public void resetCooldownTimes(ObjectContainer 
container) {
+                               // Ignore
+                       }
+
+                       @Override
+                       public boolean hasValidKeys(KeysFetchingLocally 
fetching, ObjectContainer container, ClientContext context) {
+                               return true;
+                       }
+
+                       @Override
+                       public void preRegister(ObjectContainer container, 
ClientContext context) {
+                               unregister(container, context);
+                               USKAttempt[] attempts;
+                               synchronized(USKFetcher.this) {
+                                       if(cancelled) return;
+                                       runningStoreChecker = false;
+                                       attempts = attemptsToStart.toArray(new 
USKAttempt[attemptsToStart.size()]);
+                                       attemptsToStart.clear();
+                                       done = true;
+                               }
+                               if(logMINOR) Logger.minor(this, "Checked 
datastore, finishing registration for "+attempts.length+" checkers for 
"+USKFetcher.this+" for "+origUSK);
+                                       for(int i=0;i<attempts.length;i++) {
+                                               long lastEd = 
uskManager.lookupLatestSlot(origUSK);
+                                               // FIXME not sure this 
condition works, test it!
+                                               if(keepLastData && 
lastRequestData == null && lastEd == origUSK.suggestedEdition)
+                                                       lastEd--; // If we want 
the data, then get it for the known edition, so we always get the data, so 
USKInserter can compare it and return the old edition if it is identical.
+                                               if(attempts[i].number > lastEd)
+                                                       
attempts[i].schedule(container, context);
+                                               else {
+                                                       synchronized(this) {
+                                                               
runningAttempts.remove(attempts[i]);
+                                                       }
+                                               }
+                                       }
+                       }
+
+                       @Override
+                       public SendableRequestItem 
chooseKey(KeysFetchingLocally keys, ObjectContainer container, ClientContext 
context) {
+                               return null;
+                       }
+
+                       @Override
+                       public long countAllKeys(ObjectContainer container, 
ClientContext context) {
+                               return keysWatching.size();
+                       }
+
+                       @Override
+                       public long countSendableKeys(ObjectContainer 
container, ClientContext context) {
+                               return 0;
+                       }
+
+                       @Override
+                       public RequestClient getClient(ObjectContainer 
container) {
+                               synchronized(USKFetcher.this) {
+                                       checkedDatastoreUpTo = lastCheck;
+                               }
+                               return USKFetcher.this.uskManager;
+                       }
+
+                       @Override
+                       public ClientRequester getClientRequest() {
+                               return parent;
+                       }
+
+                       @Override
+                       public short getPriorityClass(ObjectContainer 
container) {
+                               return progressPollPriority; // FIXME
+                       }
+
+                       @Override
+                       public int getRetryCount() {
+                               return 0;
+                       }
+
+                       @Override
+                       public boolean isCancelled(ObjectContainer container) {
+                               return done;
+                       }
+
+                       @Override
+                       public boolean isSSK() {
+                               return true;
+                       }
+
+                       @Override
+                       public List<PersistentChosenBlock> 
makeBlocks(PersistentChosenRequest request, RequestScheduler sched, 
ObjectContainer container, ClientContext context) {
+                               return null;
+                       }
+
+                       public boolean isEmpty(ObjectContainer container) {
+                               return done;
+                       }
+                       
+               };
+               runningStoreChecker = true;
+               try {
+                       context.getSskFetchScheduler().register(null, new 
SendableGet[] { storeChecker } , false, false, null, null, false);
+               } catch (KeyListenerConstructionException e1) {
+                       // Impossible
+                       runningStoreChecker = false;
+               } catch (Throwable t) {
+                       runningStoreChecker = false;
+                       Logger.error(this, "Unable to start: "+t, t);
+               }
        }
 
        public synchronized boolean isCancelled(ObjectContainer container) {

Modified: trunk/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKManager.java      2009-04-18 
20:17:47 UTC (rev 27022)
+++ trunk/freenet/src/freenet/client/async/USKManager.java      2009-04-18 
23:22:39 UTC (rev 27023)
@@ -198,7 +198,7 @@
                                        public void onFoundEdition(long l, USK 
key, ObjectContainer container, ClientContext context, boolean metadata, short 
codec, byte[] data, boolean newKnownGood, boolean newSlotToo) {
                                                if(l <= min) return;
                                                FreenetURI uri = 
key.copy(l).getURI();
-                                               final ClientGetter get = new 
ClientGetter(new NullClientCallback(), uri, new FetchContext(fctx, 
FetchContext.IDENTICAL_MASK, false, null), 
RequestStarter.UPDATE_PRIORITY_CLASS, USKManager.this, new NullBucket(), null);
+                                               final ClientGetter get = new 
ClientGetter(new NullClientCallback(), uri, new FetchContext(fctx, 
FetchContext.IDENTICAL_MASK, false, null), 
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS, USKManager.this, new 
NullBucket(), null);
                                                try {
                                                        get.start(null, 
context);
                                                } catch (FetchException e) {

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to