Author: toad
Date: 2008-10-02 22:20:35 +0000 (Thu, 02 Oct 2008)
New Revision: 22923
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
branches/db4o/freenet/src/freenet/clients/http/QueueToadlet.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
Log:
countQueuedRequests() vs countWaitingKeys().
The former actually enumerates the queue, and logs extensively to stderr,
enabling us to see exactly how big the queue is, and spot any leakage quickly.
Both are called by /queue/countRequests.html.
Load KeyListener's only once, not once per ClientRequestScheduler, and assign
them to the correct KeyListener list by isSSK().
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-10-02 22:20:17 UTC (rev 22922)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-10-02 22:20:35 UTC (rev 22923)
@@ -9,6 +9,7 @@
import java.util.LinkedList;
import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
import freenet.client.FECQueue;
import freenet.client.FetchException;
@@ -143,6 +144,35 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
+ public static void loadKeyListeners(final ObjectContainer container,
ClientContext context) {
+ ObjectSet<HasKeyListener> results =
+ container.query(HasKeyListener.class);
+ for(HasKeyListener l : results) {
+ container.activate(l, 1);
+ try {
+ if(l.isCancelled(container)) continue;
+ KeyListener listener =
l.makeKeyListener(container, context);
+ if(listener != null) {
+ if(listener.isSSK())
+
context.getSskFetchScheduler().addPersistentPendingKeys(listener);
+ else
+
context.getChkFetchScheduler().addPersistentPendingKeys(listener);
+ System.err.println("Loaded request key
listener: "+listener+" for "+l);
+ }
+ } catch (KeyListenerConstructionException e) {
+ System.err.println("FAILED TO LOAD REQUEST
BLOOM FILTERS:");
+ e.printStackTrace();
+ Logger.error(ClientRequestSchedulerCore.class,
"FAILED TO LOAD REQUEST BLOOM FILTERS: "+e, e);
+ } catch (Throwable t) {
+ // Probably an error on last startup???
+ Logger.error(ClientRequestSchedulerCore.class,
"FAILED TO LOAD REQUEST: "+t, t);
+ System.err.println("FAILED TO LOAD REQUEST:
"+t);
+ t.printStackTrace();
+ }
+ container.deactivate(l, 1);
+ }
+ }
+
public void start(NodeClientCore core) {
schedCore.start(core);
queueFillRequestStarterQueue();
@@ -623,6 +653,15 @@
else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
fuzz = 0;
synchronized(starterQueue) {
+ if((!isSSKScheduler) && (!isInsertScheduler)) {
+ Logger.minor(this, "Scheduling CHK fetches...");
+ for(SendableRequest req :
runningPersistentRequests) {
+ boolean wasActive =
container.ext().isActive(req);
+ if(!wasActive) container.activate(req,
1);
+ Logger.minor(this, "Running persistent
request: "+req);
+ if(!wasActive)
container.deactivate(req, 1);
+ }
+ }
// Recompute starterQueueLength
int length = 0;
PersistentChosenRequest old = null;
@@ -658,6 +697,9 @@
}
}
+ if((!isSSKScheduler) && (!isInsertScheduler)) {
+ Logger.minor(this, "Scheduling CHK fetches...");
+ }
while(true) {
SendableRequest request =
schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient,
false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container);
if(request == null) return;
@@ -1036,6 +1078,10 @@
return schedCore.hasKey(key);
}
+ public long countPersistentWaitingKeys(ObjectContainer container) {
+ return schedCore.countWaitingKeys(container);
+ }
+
public long countPersistentQueuedRequests(ObjectContainer container) {
return schedCore.countQueuedRequests(container);
}
@@ -1063,5 +1109,9 @@
SHA256.returnMessageDigest(md);
return ret;
}
+
+ void addPersistentPendingKeys(KeyListener listener) {
+ schedCore.addPendingKeys(listener);
+ }
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-10-02 22:20:17 UTC (rev 22922)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-10-02 22:20:35 UTC (rev 22923)
@@ -24,6 +24,7 @@
import freenet.node.SendableInsert;
import freenet.node.SendableRequest;
import freenet.support.Logger;
+import freenet.support.RandomGrabArray;
import freenet.support.SectoredRandomGrabArrayWithInt;
import freenet.support.SectoredRandomGrabArrayWithObject;
import freenet.support.SortedVectorByNumber;
@@ -247,7 +248,7 @@
return priority;
}
- public synchronized long countQueuedRequests(ObjectContainer container)
{
+ public synchronized long countWaitingKeys(ObjectContainer container) {
long count = 0;
for(KeyListener listener : keyListeners)
count += listener.countKeys();
@@ -399,4 +400,56 @@
return sb.toString();
}
+ public synchronized long countQueuedRequests(ObjectContainer container)
{
+ long total = 0;
+ for(int i=0;i<priorities.length;i++) {
+ SortedVectorByNumber prio = priorities[i];
+ if(prio == null || prio.isEmpty())
+ System.out.println("Priority "+i+" : empty");
+ else {
+ System.out.println("Priority "+i+" :
"+prio.count());
+ for(int j=0;j<prio.count();j++) {
+ int frc = prio.getNumberByIndex(j);
+ System.out.println("Fixed retry count:
"+frc);
+ SectoredRandomGrabArrayWithInt
clientGrabber = (SectoredRandomGrabArrayWithInt) prio.get(frc, container);
+ container.activate(clientGrabber, 1);
+ System.out.println("Clients:
"+clientGrabber.size()+" for "+clientGrabber);
+ for(int k=0;k<clientGrabber.size();k++)
{
+ Object client =
clientGrabber.getClient(k);
+ container.activate(client, 1);
+ System.out.println("Client
"+k+" : "+client);
+ container.deactivate(client, 1);
+
SectoredRandomGrabArrayWithObject requestGrabber =
(SectoredRandomGrabArrayWithObject) clientGrabber.getGrabber(client);
+
container.activate(requestGrabber, 1);
+ System.out.println("SRGA for
client: "+requestGrabber);
+ for(int
l=0;l<requestGrabber.size();l++) {
+ client =
requestGrabber.getClient(l);
+
container.activate(client, 1);
+
System.out.println("Request "+l+" : "+client);
+
container.deactivate(client, 1);
+ RandomGrabArray rga =
(RandomGrabArray) requestGrabber.getGrabber(client);
+ container.activate(rga,
1);
+
System.out.println("Queued SendableRequests: "+rga.size()+" on "+rga);
+ long sendable = 0;
+ long all = 0;
+ for(int
m=0;m<rga.size();m++) {
+ SendableRequest
req = (SendableRequest) rga.get(m);
+ if(req == null)
continue;
+
container.activate(req, 1);
+ sendable +=
req.sendableKeys(container).length;
+ all +=
req.allKeys(container).length;
+
container.deactivate(req, 1);
+ }
+
System.out.println("Sendable keys: "+sendable+" all keys "+all+" diff
"+(all-sendable));
+ total += all;
+
container.deactivate(rga, 1);
+ }
+
container.deactivate(requestGrabber, 1);
+ }
+ container.deactivate(clientGrabber, 1);
+ }
+ }
+ }
+ return total;
+ }
}
Modified:
branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
2008-10-02 22:20:17 UTC (rev 22922)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
2008-10-02 22:20:35 UTC (rev 22923)
@@ -136,4 +136,12 @@
}
}
+ public long size(ObjectContainer container) {
+ Query query = container.query();
+ query.constrain(PersistentCooldownQueueItem.class);
+ query.descend("parent").constrain(this).identity();
+ ObjectSet results = query.execute();
+ return results.size();
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/clients/http/QueueToadlet.java
===================================================================
--- branches/db4o/freenet/src/freenet/clients/http/QueueToadlet.java
2008-10-02 22:20:17 UTC (rev 22922)
+++ branches/db4o/freenet/src/freenet/clients/http/QueueToadlet.java
2008-10-02 22:20:35 UTC (rev 22923)
@@ -509,8 +509,10 @@
HTMLNode pageNode = null;
try {
if(count) {
- long queued =
core.requestStarters.chkFetchScheduler.countPersistentQueuedRequests(container);
- System.err.println("Total
queued CHK requests: "+queued);
+ long queued =
core.requestStarters.chkFetchScheduler.countPersistentWaitingKeys(container);
+ System.err.println("Total
waiting CHKs: "+queued);
+ long reallyQueued =
core.requestStarters.chkFetchScheduler.countPersistentQueuedRequests(container);
+ System.err.println("Total
queued CHK requests: "+reallyQueued);
pageNode =
pageMaker.getPageNode(L10n.getString("QueueToadlet.title", new String[]{
"nodeName" }, new String[]{ core.getMyName() }), ctx);
HTMLNode contentNode =
pageMaker.getContentNode(pageNode);
/* add alert summary box */
@@ -518,7 +520,8 @@
contentNode.addChild(core.alerts.createSummary());
HTMLNode infobox =
contentNode.addChild(pageMaker.getInfobox("infobox-information", "Queued
requests status"));
HTMLNode infoboxContent =
pageMaker.getContentNode(infobox);
- infoboxContent.addChild("#",
"Total queued CHK requests: "+queued);
+ infoboxContent.addChild("p",
"Total awaiting CHKs: "+queued);
+ infoboxContent.addChild("p",
"Total queued CHK requests: "+reallyQueued);
return;
} else {
pageNode =
handleGetInner(pageMaker, container, context, request, ctx);
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-10-02
22:20:17 UTC (rev 22922)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-10-02
22:20:35 UTC (rev 22923)
@@ -306,6 +306,7 @@
requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config, throttleFS, clientContext);
clientContext.init(requestStarters);
+ ClientRequestScheduler.loadKeyListeners(container,
clientContext);
InsertCompressor.load(container, clientContext);
node.securityLevels.addPhysicalThreatLevelListener(new
SecurityLevelListener<PHYSICAL_THREAT_LEVEL>() {