Author: toad
Date: 2008-08-13 12:14:42 +0000 (Wed, 13 Aug 2008)
New Revision: 21799
Modified:
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
Log:
Don't finish by size, finish when none started and none not started.
Add to blocksStarted when start.
Remove blocks from blocksFinished in finish() when handle them.
Logging. Timeout after 20 minutes from selection: if not finished, log an error.
Log an error if finish twice.
Modified:
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-08-13 12:13:04 UTC (rev 21798)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-08-13 12:14:42 UTC (rev 21799)
@@ -39,13 +39,13 @@
public transient final boolean localRequestOnly;
public transient final boolean cacheLocalRequests;
public transient final boolean ignoreStore;
- public transient final int size;
public transient final ArrayList<PersistentChosenBlock>
blocksNotStarted;
public transient final ArrayList<PersistentChosenBlock> blocksStarted;
public transient final ArrayList<PersistentChosenBlock> blocksFinished;
public final RequestScheduler scheduler;
public final SendableRequestSender sender;
private boolean logMINOR;
+ private boolean finished;
PersistentChosenRequest(SendableRequest req, short prio, int
retryCount, ObjectContainer container, RequestScheduler sched, ClientContext
context) {
request = req;
@@ -76,7 +76,19 @@
sender = req.getSender(container, context);
if(!reqActive)
container.deactivate(req, 1);
- size = blocksNotStarted.size();
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+
+ // Remove debugging check? How reasonable is the timeout for
inserts??
+ context.ticker.queueTimedJob(new Runnable() {
+
+ public void run() {
+ synchronized(PersistentChosenRequest.this) {
+ if(finished) return;
+ Logger.error(this, "Still not finished
after timeout: "+this);
+ }
+ }
+
+ }, 20*60*1000);
}
void onFinished(PersistentChosenBlock block, ClientContext context) {
@@ -104,9 +116,9 @@
return;
}
blocksFinished.add(block);
- if(blocksFinished.size() < size) {
+ if(!(blocksNotStarted.isEmpty() &&
blocksStarted.isEmpty())) {
if(logMINOR)
- Logger.minor(this, "Blocks finished:
"+blocksFinished.size()+" of "+size+" on "+this+" for "+request);
+ Logger.minor(this, "Not finishing yet:
blocks not started: "+blocksNotStarted.size()+" started:
"+blocksStarted.size()+" finished: "+blocksFinished.size());
return;
}
}
@@ -114,19 +126,41 @@
context.jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
- finish(container, context);
+ finish(container, context, false);
}
}, NativeThread.NORM_PRIORITY + 1, false);
}
- private void finish(ObjectContainer container, ClientContext context) {
- Logger.error(this, "Finishing "+this+" for "+request);
+ private void finish(ObjectContainer container, ClientContext context,
boolean dumping) {
+ container.activate(request, 1);
+ Logger.normal(this, "Finishing "+this+" for "+request);
// Call all the callbacks.
+ PersistentChosenBlock[] finishedBlocks;
+ synchronized(this) {
+ if(finished) {
+ if(blocksFinished.isEmpty()) {
+ // Okay...
+ return;
+ } else {
+ Logger.error(this, "Finished but
blocksFinished not empty on "+this, new Exception("debug"));
+ // Process the blocks...
+ }
+ }
+ finished = true;
+ finishedBlocks = blocksFinished.toArray(new
PersistentChosenBlock[blocksFinished.size()]);
+ }
+ if(finishedBlocks.length == 0) {
+ if(!dumping)
+ Logger.error(this, "No finished blocks in
finish() on "+this);
+ else if(logMINOR)
+ Logger.minor(this, "No finished blocks in
finish() on "+this);
+ return;
+ }
if(request instanceof SendableGet) {
boolean supportsBulk = request instanceof
SupportsBulkCallFailure;
Vector<BulkCallFailureItem> bulkFailItems = null;
- for(PersistentChosenBlock block : blocksFinished) {
+ for(PersistentChosenBlock block : finishedBlocks) {
if(!block.fetchSucceeded()) {
LowLevelGetException e =
block.failedGet();
if(supportsBulk) {
@@ -146,7 +180,7 @@
container.commit(); // db4o is read-committed,
so we need to commit here.
}
} else /*if(request instanceof SendableInsert)*/ {
- for(PersistentChosenBlock block : blocksFinished) {
+ for(PersistentChosenBlock block : finishedBlocks) {
container.activate(request, 1);
if(block.insertSucceeded()) {
((SendableInsert)request).onSuccess(block.token, container, context);
@@ -163,8 +197,11 @@
public synchronized ChosenBlock grabNotStarted(Random random) {
int size = blocksNotStarted.size();
if(size == 0) return null;
- if(size == 1) return blocksNotStarted.remove(0);
- return blocksNotStarted.remove(random.nextInt(size));
+ PersistentChosenBlock ret;
+ if(size == 1) ret = blocksNotStarted.remove(0);
+ else ret = blocksNotStarted.remove(random.nextInt(size));
+ blocksStarted.add(ret);
+ return ret;
}
public synchronized int sizeNotStarted() {
@@ -184,7 +221,7 @@
}
if(!wasStarted) {
if(logMINOR) Logger.minor(this, "Finishing immediately
in onDumped() as nothing pending: "+this);
- finish(container, core.sched.clientContext);
+ finish(container, core.sched.clientContext, true);
}
}
}