Author: toad
Date: 2009-02-03 19:12:22 +0000 (Tue, 03 Feb 2009)
New Revision: 25507
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Prevent the same transient insert from being selected more than once.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2009-02-03 19:12:22 UTC (rev 25507)
@@ -45,7 +45,7 @@
public class ClientRequestScheduler implements RequestScheduler {
private final ClientRequestSchedulerCore schedCore;
- private final ClientRequestSchedulerNonPersistent schedTransient;
+ final ClientRequestSchedulerNonPersistent schedTransient;
private static boolean logMINOR;
@@ -1028,6 +1028,10 @@
public void removeFetchingKey(Key key) {
schedCore.removeFetchingKey(key);
}
+
+ public void removeTransientInsertFetching(SendableInsert insert, Object
token) {
+ schedCore.removeTransientInsertFetching(insert, token);
+ }
/**
* Map from SendableGet implementing SupportsBulkCallFailure to
BulkCallFailureItem[].
@@ -1086,6 +1090,10 @@
return schedCore.addToFetching(key);
}
+ public boolean addTransientInsertFetching(SendableInsert insert, Object
token) {
+ return schedCore.addTransientInsertFetching(insert, token);
+ }
+
public boolean hasFetchingKey(Key key) {
return schedCore.hasKey(key);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2009-02-03 18:21:42 UTC (rev 25506)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2009-02-03 19:12:22 UTC (rev 25507)
@@ -20,6 +20,7 @@
import freenet.node.Node;
import freenet.node.RequestStarter;
import freenet.node.SendableGet;
+import freenet.node.SendableInsert;
import freenet.node.SendableRequest;
import freenet.support.Logger;
import freenet.support.PrioritizedSerialExecutor;
@@ -54,6 +55,30 @@
*/
private transient HashSet keysFetching;
+ private class RunningTransientInsert {
+
+ final SendableInsert insert;
+ final Object token;
+
+ RunningTransientInsert(SendableInsert i, Object t) {
+ insert = i;
+ token = t;
+ }
+
+ public int hashCode() {
+ return insert.hashCode() ^ token.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if(!(o instanceof RunningTransientInsert)) return false;
+ RunningTransientInsert r = (RunningTransientInsert) o;
+ return r.insert == insert && (r.token == token ||
r.token.equals(token));
+ }
+
+ }
+
+ private transient HashSet<RunningTransientInsert>
runningTransientInserts;
+
public final byte[] globalSalt;
/**
@@ -114,10 +139,13 @@
this.sched = sched;
this.initTime = System.currentTimeMillis();
// We DO NOT want to rerun the query after consuming the
initial set...
- if(!isInsertScheduler)
+ if(!isInsertScheduler) {
keysFetching = new HashSet();
- else
+ runningTransientInserts = null;
+ } else {
keysFetching = null;
+ runningTransientInserts = new
HashSet<RunningTransientInsert>();
+ }
if(isInsertScheduler) {
preRegisterMeRunner = new DBJob() {
@@ -716,5 +744,35 @@
return ret + cooldown;
}
+ public boolean hasTransientInsert(SendableInsert insert, Object token) {
+ RunningTransientInsert tmp = new RunningTransientInsert(insert,
token);
+ synchronized(runningTransientInserts) {
+ return runningTransientInserts.contains(tmp);
+ }
+ }
+
+ public boolean addTransientInsertFetching(SendableInsert insert, Object
token) {
+ RunningTransientInsert tmp = new RunningTransientInsert(insert,
token);
+ synchronized(runningTransientInserts) {
+ boolean retval = runningTransientInserts.add(tmp);
+ if(!retval) {
+ Logger.normal(this, "Already in
runningTransientInserts: "+insert+" : "+token);
+ } else {
+ if(logMINOR)
+ Logger.minor(this, "Added to
runningTransientInserts: "+insert+" : "+token);
+ }
+ return retval;
+ }
+ }
+
+ public void removeTransientInsertFetching(SendableInsert insert, Object
token) {
+ RunningTransientInsert tmp = new RunningTransientInsert(insert,
token);
+ if(logMINOR)
+ Logger.minor(this, "Removing from
runningTransientInserts: "+insert+" : "+token);
+ synchronized(runningTransientInserts) {
+ runningTransientInserts.remove(tmp);
+ }
+ }
+
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2009-02-03 19:12:22 UTC (rev 25507)
@@ -464,7 +464,12 @@
public synchronized Object chooseKey(KeysFetchingLocally ignored,
ObjectContainer container, ClientContext context) {
if(finished) return null;
// Ignore KeysFetchingLocally, it's for requests.
- return getBlock(container, context, false);
+ Object ret = getBlock(container, context, false);
+ if(!persistent) {
+ if(ignored.hasTransientInsert(this, ret))
+ return null;
+ }
+ return ret;
}
@Override
Modified: branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/node/KeysFetchingLocally.java
2009-02-03 19:12:22 UTC (rev 25507)
@@ -9,5 +9,13 @@
* LOCKING: This should be safe just about anywhere, the lock
protecting it is always taken last.
*/
public boolean hasKey(Key key);
+
+ /**
+ * Is this request:token pair being executed? This applies only to
+ * non-persistent inserts, because persistent requests are selected on
+ * a request level, and requests use hasKey(). Also, activation issues
+ * with SendableRequest meaning getting a hash code would be
problematic!
+ */
+ public boolean hasTransientInsert(SendableInsert insert, Object token);
}
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2009-02-03 18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2009-02-03 19:12:22 UTC (rev 25507)
@@ -79,4 +79,8 @@
public void start(NodeClientCore core);
+ public boolean addTransientInsertFetching(SendableInsert insert, Object
token);
+
+ public void removeTransientInsertFetching(SendableInsert insert, Object
token);
+
}
Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java 2009-02-03
18:21:42 UTC (rev 25506)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java 2009-02-03
19:12:22 UTC (rev 25507)
@@ -7,6 +7,7 @@
import freenet.client.async.ChosenBlock;
import freenet.client.async.ClientContext;
+import freenet.client.async.TransientChosenBlock;
import freenet.keys.Key;
import freenet.support.Logger;
import freenet.support.OOMHandler;
@@ -185,6 +186,9 @@
if(req.key != null) {
if(!sched.addToFetching(req.key))
return false;
+ } else if((!req.isPersistent()) &&
((TransientChosenBlock)req).request instanceof SendableInsert) {
+
if(!sched.addTransientInsertFetching((SendableInsert)(((TransientChosenBlock)req).request),
req.token))
+ return false;
}
if(logMINOR) Logger.minor(this, "Running request "+req+"
priority "+req.getPriority());
core.getExecutor().execute(new SenderThread(req, req.key),
"RequestStarter$SenderThread for "+req);
@@ -230,6 +234,9 @@
Logger.minor(this, "Finished "+req);
} finally {
if(key != null) sched.removeFetchingKey(key);
+ else if((!req.isPersistent()) &&
((TransientChosenBlock)req).request instanceof SendableInsert)
+
sched.removeTransientInsertFetching((SendableInsert)(((TransientChosenBlock)req).request),
req.token);
+
}
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs