Author: toad
Date: 2008-02-12 23:17:05 +0000 (Tue, 12 Feb 2008)
New Revision: 17845
Modified:
trunk/freenet/src/freenet/node/LocationManager.java
Log:
First implementation of swap queueing: queue incoming swap requests up to a
limit (instead of rejecting them when we can't lock), process each one after
the last one in unlock().
Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2008-02-12 22:57:16 UTC
(rev 17844)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2008-02-12 23:17:05 UTC
(rev 17845)
@@ -6,6 +6,7 @@
import java.security.MessageDigest;
import java.util.Enumeration;
import java.util.Hashtable;
+import java.util.LinkedList;
import java.util.Vector;
import freenet.crypt.RandomSource;
@@ -607,16 +608,34 @@
/**
* Unlock the node for swapping.
* @param logSwapTime If true, log the swap time. */
- synchronized void unlock(boolean logSwapTime) {
+ void unlock(boolean logSwapTime) {
+ Message nextMessage;
+ synchronized(this) {
if(!locked)
throw new IllegalStateException("Unlocking when not locked!");
- locked = false;
long lockTime = System.currentTimeMillis() - lockedTime;
if(logMINOR) {
Logger.minor(this, "Unlocking on port
"+node.getDarknetPortNumber());
Logger.minor(this, "lockTime: "+lockTime);
}
averageSwapTime.report(lockTime);
+
+ if(incomingMessageQueue.isEmpty()) {
+ locked = false;
+ return;
+ }
+
+ // Otherwise, stay locked, and start the next one from the queue.
+
+ nextMessage = (Message) incomingMessageQueue.removeFirst();
+
+ }
+
+ long oldID = nextMessage.getLong(DMT.UID);
+ long newID = oldID+1;
+ PeerNode pn = (PeerNode) nextMessage.getSource();
+
+ innerHandleSwapRequest(oldID, newID, pn, nextMessage);
}
/**
@@ -734,6 +753,11 @@
}
}
+ /** Queue of swap requests to handle after this one. */
+ private final LinkedList incomingMessageQueue = new LinkedList();
+
+ static final int MAX_INCOMING_QUEUE_LENGTH = 10;
+
/**
* Handle an incoming SwapRequest
* @return True if we have handled the message, false if it needs
@@ -796,6 +820,7 @@
if(htl <= 0) {
if(logMINOR) Logger.minor(this, "Accepting?... "+oldID);
// Accept - handle locally
+ lockOrQueue(m, oldID, newID, pn);
if(!lock()) {
if(logMINOR) Logger.minor(this, "Can't obtain lock on "+oldID+"
- rejecting to "+pn);
// Reject
@@ -853,6 +878,48 @@
}
}
+ /**
+ * If we can obtain the lock, then execute the swap by calling
innerHandleSwapRequest().
+ * If we can queue the message, queue it.
+ * Otherwise, reject it.
+ */
+ void lockOrQueue(Message msg, long oldID, long newID, PeerNode pn) {
+ boolean runNow = false;
+ boolean reject = false;
+ synchronized(this) {
+ if(!locked) {
+ locked = true;
+ runNow = true;
+ } else {
+ // Locked.
+ if(incomingMessageQueue.size() >
MAX_INCOMING_QUEUE_LENGTH) {
+ // Reject anyway.
+ runNow = false;
+ } else {
+ // Queue it.
+ incomingMessageQueue.addLast(msg);
+ }
+ }
+ }
+ if(reject) {
+ Message rejected = DMT.createFNPSwapRejected(oldID);
+ try {
+ pn.sendAsync(rejected, null, 0, null);
+ } catch (NotConnectedException e1) {
+ if(logMINOR) Logger.minor(this, "Lost connection rejecting
SwapRequest (locked) from "+pn);
+ }
+ swapsRejectedAlreadyLocked++;
+ } else if(runNow) {
+ boolean logSwapTime = false;
+ try {
+ innerHandleSwapRequest(oldID, newID, pn, msg);
+ logSwapTime = true;
+ } finally {
+ unlock(logSwapTime);
+ }
+ } // else it is queued.
+ }
+
private void innerHandleSwapRequest(long oldID, long newID, PeerNode pn,
Message m) {
RecentlyForwardedItem item = addForwardedItem(oldID, newID, pn, null);
// Locked, do it