Author: toad
Date: 2009-01-09 15:01:46 +0000 (Fri, 09 Jan 2009)
New Revision: 24981
Added:
trunk/freenet/src/freenet/node/InsertTag.java
trunk/freenet/src/freenet/node/OfferReplyTag.java
trunk/freenet/src/freenet/node/RequestTag.java
trunk/freenet/src/freenet/node/UIDTag.java
Modified:
trunk/freenet/src/freenet/node/CHKInsertHandler.java
trunk/freenet/src/freenet/node/FailureTable.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeClientCore.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
Log:
Debug and maybe fix the bandwidth collapse bug. Record info on a UIDTag for
each request about its status. If it takes more than 10 minutes to complete,
remove it and log the info as an error.
Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java 2009-01-08
22:08:28 UTC (rev 24980)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java 2009-01-09
15:01:46 UTC (rev 24981)
@@ -45,14 +45,16 @@
private BlockReceiver br;
private Thread runThread;
PartiallyReceivedBlock prb;
+ final InsertTag tag;
private static boolean logMINOR;
- CHKInsertHandler(Message req, PeerNode source, long id, Node node, long
startTime) {
+ CHKInsertHandler(Message req, PeerNode source, long id, Node node, long
startTime, InsertTag tag) {
this.req = req;
this.node = node;
this.uid = id;
this.source = source;
this.startTime = startTime;
+ this.tag = tag;
key = (NodeCHK) req.getObject(DMT.FREENET_ROUTING_KEY);
htl = req.getShort(DMT.HTL);
if(htl <= 0) htl = 1;
@@ -71,11 +73,13 @@
realRun();
} catch (OutOfMemoryError e) {
OOMHandler.handleOOM(e);
+ tag.handlerThrew(e);
} catch (Throwable t) {
Logger.error(this, "Caught in run() "+t, t);
+ tag.handlerThrew(t);
} finally {
if(logMINOR) Logger.minor(this, "Exiting CHKInsertHandler.run()
for "+uid);
- node.unlockUID(uid, false, true, false, false, false);
+ node.unlockUID(uid, false, true, false, false, false, tag);
}
}
Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java 2009-01-08 22:08:28 UTC
(rev 24980)
+++ trunk/freenet/src/freenet/node/FailureTable.java 2009-01-09 15:01:46 UTC
(rev 24981)
@@ -379,16 +379,16 @@
* @param source The node that asked for the key.
* @throws NotConnectedException If the sender ceases to be connected.
*/
- public void sendOfferedKey(final Key key, final boolean isSSK, final
boolean needPubKey, final long uid, final PeerNode source) throws
NotConnectedException {
+ public void sendOfferedKey(final Key key, final boolean isSSK, final
boolean needPubKey, final long uid, final PeerNode source, final OfferReplyTag
tag) throws NotConnectedException {
this.offerExecutor.execute(new Runnable() {
public void run() {
try {
- innerSendOfferedKey(key, isSSK,
needPubKey, uid, source);
+ innerSendOfferedKey(key, isSSK,
needPubKey, uid, source, tag);
} catch (NotConnectedException e) {
- node.unlockUID(uid, isSSK, false,
false, true, false);
+ node.unlockUID(uid, isSSK, false,
false, true, false, tag);
// Too bad.
} catch (Throwable t) {
- node.unlockUID(uid, isSSK, false,
false, true, false);
+ node.unlockUID(uid, isSSK, false,
false, true, false, tag);
Logger.error(this, "Caught "+t+"
sending offered key");
}
}
@@ -400,13 +400,13 @@
* on a separate thread. However, blocking disk I/O *should happen on
this thread*. We deliberately
* serialise it, as high latencies can otherwise result.
*/
- protected void innerSendOfferedKey(Key key, final boolean isSSK,
boolean needPubKey, final long uid, final PeerNode source) throws
NotConnectedException {
+ protected void innerSendOfferedKey(Key key, final boolean isSSK,
boolean needPubKey, final long uid, final PeerNode source, final OfferReplyTag
tag) throws NotConnectedException {
if(isSSK) {
SSKBlock block = node.fetch((NodeSSK)key, false);
if(block == null) {
// Don't have the key
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
- node.unlockUID(uid, isSSK, false, false, true,
false);
+ node.unlockUID(uid, isSSK, false, false, true,
false, tag);
return;
}
@@ -433,7 +433,7 @@
} catch (SyncSendWaitedTooLongException
e) {
// Impossible
} finally {
- node.unlockUID(uid, isSSK,
false, false, true, false);
+ node.unlockUID(uid, isSSK,
false, false, true, false, tag);
}
}
@@ -452,7 +452,7 @@
if(block == null) {
// Don't have the key
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid,
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
- node.unlockUID(uid, isSSK, false, false, true,
false);
+ node.unlockUID(uid, isSSK, false, false, true,
false, tag);
return;
}
Message df = DMT.createFNPCHKDataFound(uid,
block.getRawHeaders());
@@ -473,7 +473,7 @@
} catch (Throwable t) {
Logger.error(this, "Sending
offered key failed: "+t, t);
} finally {
- node.unlockUID(uid, isSSK,
false, false, true, false);
+ node.unlockUID(uid, isSSK,
false, false, true, false, tag);
}
}
Added: trunk/freenet/src/freenet/node/InsertTag.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertTag.java
(rev 0)
+++ trunk/freenet/src/freenet/node/InsertTag.java 2009-01-09 15:01:46 UTC
(rev 24981)
@@ -0,0 +1,45 @@
+package freenet.node;
+
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+/**
+ * Represents an insert.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class InsertTag extends UIDTag {
+
+ final boolean ssk;
+
+ enum START {
+ LOCAL,
+ REMOTE
+ }
+
+ START start;
+ private Throwable handlerThrew;
+
+ InsertTag(boolean ssk, START start) {
+ super();
+ this.start = start;
+ this.ssk = ssk;
+ }
+
+ public void handlerThrew(Throwable t) {
+ handlerThrew = t;
+ }
+
+ @Override
+ public void logStillPresent(Long uid) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Still present after
").append(TimeUtil.formatTime(age()));
+ sb.append(" : ").append(uid).append(" : start=").append(start);
+ sb.append(" ssk=").append(ssk);
+ sb.append(" thrown=").append(handlerThrew);
+ if(handlerThrew != null)
+ Logger.error(this, sb.toString(), handlerThrew);
+ else
+ Logger.error(this, sb.toString());
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2009-01-08 22:08:28 UTC (rev
24980)
+++ trunk/freenet/src/freenet/node/Node.java 2009-01-09 15:01:46 UTC (rev
24981)
@@ -365,17 +365,17 @@
public boolean disableHangCheckers;
/** HashSet of currently running request UIDs */
- private final HashSet<Long> runningUIDs;
- private final HashSet<Long> runningCHKGetUIDs;
- private final HashSet<Long> runningLocalCHKGetUIDs;
- private final HashSet<Long> runningSSKGetUIDs;
- private final HashSet<Long> runningLocalSSKGetUIDs;
- private final HashSet<Long> runningCHKPutUIDs;
- private final HashSet<Long> runningLocalCHKPutUIDs;
- private final HashSet<Long> runningSSKPutUIDs;
- private final HashSet<Long> runningLocalSSKPutUIDs;
- private final HashSet<Long> runningCHKOfferReplyUIDs;
- private final HashSet<Long> runningSSKOfferReplyUIDs;
+ private final HashMap<Long,UIDTag> runningUIDs;
+ private final HashMap<Long,RequestTag> runningCHKGetUIDs;
+ private final HashMap<Long,RequestTag> runningLocalCHKGetUIDs;
+ private final HashMap<Long,RequestTag> runningSSKGetUIDs;
+ private final HashMap<Long,RequestTag> runningLocalSSKGetUIDs;
+ private final HashMap<Long,InsertTag> runningCHKPutUIDs;
+ private final HashMap<Long,InsertTag> runningLocalCHKPutUIDs;
+ private final HashMap<Long,InsertTag> runningSSKPutUIDs;
+ private final HashMap<Long,InsertTag> runningLocalSSKPutUIDs;
+ private final HashMap<Long,OfferReplyTag> runningCHKOfferReplyUIDs;
+ private final HashMap<Long,OfferReplyTag> runningSSKOfferReplyUIDs;
/** Semi-unique ID for swap requests. Used to identify us so that the
* topology can be reconstructed. */
@@ -856,17 +856,17 @@
transferringRequestSenders = new HashMap<NodeCHK,
RequestSender>();
transferringRequestHandlers = new HashSet<Long>();
insertSenders = new HashMap<KeyHTLPair, AnyInsertSender>();
- runningUIDs = new HashSet<Long>();
- runningCHKGetUIDs = new HashSet<Long>();
- runningLocalCHKGetUIDs = new HashSet<Long>();
- runningSSKGetUIDs = new HashSet<Long>();
- runningLocalSSKGetUIDs = new HashSet<Long>();
- runningCHKPutUIDs = new HashSet<Long>();
- runningLocalCHKPutUIDs = new HashSet<Long>();
- runningSSKPutUIDs = new HashSet<Long>();
- runningLocalSSKPutUIDs = new HashSet<Long>();
- runningCHKOfferReplyUIDs = new HashSet<Long>();
- runningSSKOfferReplyUIDs = new HashSet<Long>();
+ runningUIDs = new HashMap<Long,UIDTag>();
+ runningCHKGetUIDs = new HashMap<Long,RequestTag>();
+ runningLocalCHKGetUIDs = new HashMap<Long,RequestTag>();
+ runningSSKGetUIDs = new HashMap<Long,RequestTag>();
+ runningLocalSSKGetUIDs = new HashMap<Long,RequestTag>();
+ runningCHKPutUIDs = new HashMap<Long,InsertTag>();
+ runningLocalCHKPutUIDs = new HashMap<Long,InsertTag>();
+ runningSSKPutUIDs = new HashMap<Long,InsertTag>();
+ runningLocalSSKPutUIDs = new HashMap<Long,InsertTag>();
+ runningCHKOfferReplyUIDs = new HashMap<Long,OfferReplyTag>();
+ runningSSKOfferReplyUIDs = new HashMap<Long,OfferReplyTag>();
this.securityLevels = new SecurityLevels(this, config);
@@ -2274,6 +2274,8 @@
this.clientCore.start(config);
+ startDeadUIDChecker();
+
// After everything has been created, write the config file
back to disk.
if(config instanceof FreenetFilePersistentConfig) {
FreenetFilePersistentConfig cfg =
(FreenetFilePersistentConfig) config;
@@ -2941,55 +2943,137 @@
return is;
}
- public boolean lockUID(long uid, boolean ssk, boolean insert, boolean
offerReply, boolean local) {
+ public boolean lockUID(long uid, boolean ssk, boolean insert, boolean
offerReply, boolean local, UIDTag tag) {
synchronized(runningUIDs) {
- if(!runningUIDs.add(uid)) {
- // Already present.
- return false;
- }
+ if(runningUIDs.containsKey(uid)) return false; //
Already present.
+ runningUIDs.put(uid, tag);
}
// If these are switched around, we must remember to remove
from both.
- HashSet<Long> set = getUIDTracker(ssk, insert, offerReply,
local);
- synchronized(set) {
- if(logMINOR) Logger.minor(this, "Locking "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+"
size="+set.size());
- set.add(uid);
- if(logMINOR) Logger.minor(this, "Locked "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+"
size="+set.size());
+ if(offerReply) {
+ HashMap<Long,OfferReplyTag> map = getOfferTracker(ssk);
+ innerLock(map, (OfferReplyTag)tag, uid, ssk, insert,
offerReply, local);
+ } else if(insert) {
+ HashMap<Long,InsertTag> map =
getInsertTracker(ssk,local);
+ innerLock(map, (InsertTag)tag, uid, ssk, insert,
offerReply, local);
+ } else {
+ HashMap<Long,RequestTag> map =
getRequestTracker(ssk,local);
+ innerLock(map, (RequestTag)tag, uid, ssk, insert,
offerReply, local);
}
return true;
}
- public void unlockUID(long uid, boolean ssk, boolean insert, boolean
canFail, boolean offerReply, boolean local) {
+ private<T extends UIDTag> void innerLock(HashMap<Long, T> map, T tag,
Long uid, boolean ssk, boolean insert, boolean offerReply, boolean local) {
+ synchronized(map) {
+ if(logMINOR) Logger.minor(this, "Locking "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+"
size="+map.size());
+ if(map.containsKey(uid)) {
+ Logger.error(this, "Already have UID in
specific map ("+ssk+","+insert+","+offerReply+","+local+") but not in general
map: trying to register "+tag+" but already have "+map.get(uid));
+ }
+ map.put(uid, tag);
+ if(logMINOR) Logger.minor(this, "Locked "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+"
size="+map.size());
+ }
+ }
+
+ public void unlockUID(long uid, boolean ssk, boolean insert, boolean
canFail, boolean offerReply, boolean local, UIDTag tag) {
completed(uid);
- HashSet<Long> set = getUIDTracker(ssk, insert, offerReply,
local);
- synchronized(set) {
- if(logMINOR) Logger.minor(this, "Unlocking "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+", local="+local+"
size="+set.size());
- set.remove(uid);
- if(logMINOR) Logger.minor(this, "Unlocked "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+", local="+local+"
size="+set.size());
+
+ if(offerReply) {
+ HashMap<Long,OfferReplyTag> map = getOfferTracker(ssk);
+ innerUnlock(map, (OfferReplyTag)tag, uid, ssk, insert,
offerReply, local);
+ } else if(insert) {
+ HashMap<Long,InsertTag> map =
getInsertTracker(ssk,local);
+ innerUnlock(map, (InsertTag)tag, uid, ssk, insert,
offerReply, local);
+ } else {
+ HashMap<Long,RequestTag> map =
getRequestTracker(ssk,local);
+ innerUnlock(map, (RequestTag)tag, uid, ssk, insert,
offerReply, local);
}
+
synchronized(runningUIDs) {
- if(!runningUIDs.remove(uid) && !canFail)
+ UIDTag oldTag = runningUIDs.get(uid);
+ if(oldTag == null) {
throw new IllegalStateException("Could not
unlock "+uid+ '!');
+ } else if(tag != oldTag) {
+ Logger.error(this, "Removing "+tag+" for
"+uid+" but "+tag+" is registered!");
+ return;
+ } else {
+ runningUIDs.remove(uid);
+ }
}
}
- private HashSet<Long> getUIDTracker(boolean ssk, boolean insert,
boolean offerReply, boolean local) {
+ private<T extends UIDTag> void innerUnlock(HashMap<Long, T> map, T tag,
Long uid, boolean ssk, boolean insert, boolean offerReply, boolean local) {
+ synchronized(map) {
+ if(logMINOR) Logger.minor(this, "Locking "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+"
size="+map.size());
+ if(map.get(uid) != tag)
+ Logger.error(this, "Removing "+tag+" for
"+uid+" returned "+map.get(uid));
+ else
+ map.remove(uid);
+ if(logMINOR) Logger.minor(this, "Locked "+uid+"
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+"
size="+map.size());
+ }
+ }
+
+ private HashMap<Long, RequestTag> getRequestTracker(boolean ssk,
boolean local) {
if(ssk) {
- if(offerReply)
- return runningSSKOfferReplyUIDs;
- if(!local)
- return insert ? runningSSKPutUIDs :
runningSSKGetUIDs;
- else
- return insert ? runningLocalSSKPutUIDs :
runningLocalSSKGetUIDs;
+ return local ? runningLocalSSKGetUIDs :
runningSSKGetUIDs;
} else {
- if(offerReply)
- return runningCHKOfferReplyUIDs;
- if(!local)
- return insert ? runningCHKPutUIDs :
runningCHKGetUIDs;
- else
- return insert ? runningLocalCHKPutUIDs :
runningLocalCHKGetUIDs;
+ return local ? runningLocalCHKGetUIDs :
runningCHKGetUIDs;
}
}
+
+ private HashMap<Long, InsertTag> getInsertTracker(boolean ssk, boolean
local) {
+ if(ssk) {
+ return local ? runningLocalSSKPutUIDs :
runningSSKPutUIDs;
+ } else {
+ return local ? runningLocalCHKPutUIDs :
runningCHKPutUIDs;
+ }
+ }
+
+ private HashMap<Long, OfferReplyTag> getOfferTracker(boolean ssk) {
+ return ssk ? runningSSKOfferReplyUIDs :
runningCHKOfferReplyUIDs;
+ }
+
+ private void startDeadUIDChecker() {
+ // TODO Auto-generated method stub
+
+ }
+
+ private Runnable deadUIDChecker = new Runnable() {
+ public void run() {
+ try {
+ checkUIDs(runningLocalSSKGetUIDs);
+ checkUIDs(runningLocalCHKGetUIDs);
+ checkUIDs(runningLocalSSKPutUIDs);
+ checkUIDs(runningLocalCHKPutUIDs);
+ checkUIDs(runningSSKGetUIDs);
+ checkUIDs(runningCHKGetUIDs);
+ checkUIDs(runningSSKPutUIDs);
+ checkUIDs(runningCHKPutUIDs);
+ checkUIDs(runningSSKOfferReplyUIDs);
+ checkUIDs(runningCHKOfferReplyUIDs);
+ } finally {
+ getTicker().queueTimedJob(this, 60*1000);
+ }
+ }
+
+ private void checkUIDs(HashMap<Long, ? extends UIDTag> map) {
+ Long[] uids;
+ UIDTag[] tags;
+ synchronized(map) {
+ uids = map.keySet().toArray(new
Long[map.size()]);
+ tags = map.values().toArray(new
UIDTag[map.size()]);
+ }
+ long now = System.currentTimeMillis();
+ for(int i=0;i<uids.length;i++) {
+ if(now - tags[i].createdTime > 10 * 60 * 1000) {
+ tags[i].logStillPresent(uids[i]);
+ synchronized(map) {
+ map.remove(uids[i]);
+ }
+ }
+ }
+ }
+ };
+
/**
* @return Some status information.
*/
@@ -3915,7 +3999,7 @@
public void addRunningUIDs(Vector<Long> list) {
synchronized(runningUIDs) {
- list.addAll(runningUIDs);
+ list.addAll(runningUIDs.keySet());
}
}
Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java 2009-01-08 22:08:28 UTC
(rev 24980)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java 2009-01-09 15:01:46 UTC
(rev 24981)
@@ -534,7 +534,8 @@
public void asyncGet(Key key, boolean cache, boolean offersOnly, final
SimpleRequestSenderCompletionListener listener) {
final long uid = random.nextLong();
final boolean isSSK = key instanceof NodeSSK;
- if(!node.lockUID(uid, isSSK, false, false, true)) {
+ final RequestTag tag = new RequestTag(isSSK,
RequestTag.START.ASYNC_GET);
+ if(!node.lockUID(uid, isSSK, false, false, true, tag)) {
Logger.error(this, "Could not lock UID just randomly
generated: " + uid + " - probably indicates broken PRNG");
return;
}
@@ -550,11 +551,12 @@
public void onRequestSenderFinished(int status) {
// If transfer coalescing has happened, we may
have already unlocked.
- node.unlockUID(uid, isSSK, false, true, false,
true);
+ node.unlockUID(uid, isSSK, false, true, false,
true, tag);
+ tag.setRequestSenderFinished(status);
if(listener != null)
listener.completed(status ==
RequestSender.SUCCESS);
}
- });
+ }, tag);
}
/**
@@ -563,26 +565,28 @@
* anything and will run asynchronously. Caller is responsible for
unlocking the UID.
* @param key
*/
- void asyncGet(Key key, boolean cache, boolean offersOnly, long uid,
RequestSender.Listener listener) {
+ void asyncGet(Key key, boolean cache, boolean offersOnly, long uid,
RequestSender.Listener listener, RequestTag tag) {
try {
Object o = node.makeRequestSender(key, node.maxHTL(),
uid, null, false, cache, false, offersOnly);
if(o instanceof KeyBlock) {
- node.unlockUID(uid, false, false, true, false,
true);
+ tag.servedFromDatastore = true;
+ node.unlockUID(uid, false, false, true, false,
true, tag);
return; // Already have it.
}
RequestSender rs = (RequestSender) o;
+ tag.setSender(rs);
rs.addListener(listener);
if(rs.uid != uid)
- node.unlockUID(uid, false, false, false, false,
true);
+ node.unlockUID(uid, false, false, false, false,
true, tag);
// Else it has started a request.
if(logMINOR)
Logger.minor(this, "Started " + o + " for " +
uid + " for " + key);
} catch(RuntimeException e) {
Logger.error(this, "Caught error trying to start
request: " + e, e);
- node.unlockUID(uid, false, false, true, false, true);
+ node.unlockUID(uid, false, false, true, false, true,
tag);
} catch(Error e) {
Logger.error(this, "Caught error trying to start
request: " + e, e);
- node.unlockUID(uid, false, false, true, false, true);
+ node.unlockUID(uid, false, false, true, false, true,
tag);
}
}
@@ -599,7 +603,8 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
long startTime = System.currentTimeMillis();
long uid = random.nextLong();
- if(!node.lockUID(uid, false, false, false, true)) {
+ RequestTag tag = new RequestTag(false, RequestTag.START.LOCAL);
+ if(!node.lockUID(uid, false, false, false, true, tag)) {
Logger.error(this, "Could not lock UID just randomly
generated: " + uid + " - probably indicates broken PRNG");
throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
}
@@ -607,6 +612,7 @@
Object o = node.makeRequestSender(key.getNodeCHK(),
node.maxHTL(), uid, null, localOnly, cache, ignoreStore, false);
if(o instanceof CHKBlock)
try {
+ tag.setServedFromDatastore();
return new ClientCHKBlock((CHKBlock) o,
key);
} catch(CHKVerifyException e) {
Logger.error(this, "Does not verify: "
+ e, e);
@@ -703,7 +709,7 @@
}
}
} finally {
- node.unlockUID(uid, false, false, true, false, true);
+ node.unlockUID(uid, false, false, true, false, true,
tag);
}
}
@@ -711,7 +717,8 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
long startTime = System.currentTimeMillis();
long uid = random.nextLong();
- if(!node.lockUID(uid, true, false, false, true)) {
+ RequestTag tag = new RequestTag(true, RequestTag.START.LOCAL);
+ if(!node.lockUID(uid, true, false, false, true, tag)) {
Logger.error(this, "Could not lock UID just randomly
generated: " + uid + " - probably indicates broken PRNG");
throw new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
}
@@ -719,6 +726,7 @@
Object o = node.makeRequestSender(key.getNodeKey(),
node.maxHTL(), uid, null, localOnly, cache, ignoreStore, false);
if(o instanceof SSKBlock)
try {
+ tag.setServedFromDatastore();
SSKBlock block = (SSKBlock) o;
key.setPublicKey(block.getPubKey());
return ClientSSKBlock.construct(block,
key);
@@ -814,7 +822,7 @@
}
}
} finally {
- node.unlockUID(uid, true, false, true, false, true);
+ node.unlockUID(uid, true, false, true, false, true,
tag);
}
}
@@ -834,7 +842,8 @@
PartiallyReceivedBlock prb = new
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, data);
CHKInsertSender is;
long uid = random.nextLong();
- if(!node.lockUID(uid, false, true, false, true)) {
+ InsertTag tag = new InsertTag(false, InsertTag.START.LOCAL);
+ if(!node.lockUID(uid, false, true, false, true, tag)) {
Logger.error(this, "Could not lock UID just randomly
generated: " + uid + " - probably indicates broken PRNG");
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
}
@@ -940,7 +949,7 @@
}
}
} finally {
- node.unlockUID(uid, false, true, true, false, true);
+ node.unlockUID(uid, false, true, true, false, true,
tag);
}
}
@@ -948,7 +957,8 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
SSKInsertSender is;
long uid = random.nextLong();
- if(!node.lockUID(uid, true, true, false, true)) {
+ InsertTag tag = new InsertTag(true, InsertTag.START.LOCAL);
+ if(!node.lockUID(uid, true, true, false, true, tag)) {
Logger.error(this, "Could not lock UID just randomly
generated: " + uid + " - probably indicates broken PRNG");
throw new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
}
@@ -1066,7 +1076,7 @@
}
}
} finally {
- node.unlockUID(uid, true, true, true, false, true);
+ node.unlockUID(uid, true, true, true, false, true, tag);
}
}
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2009-01-08 22:08:28 UTC
(rev 24980)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2009-01-09 15:01:46 UTC
(rev 24981)
@@ -261,7 +261,8 @@
// Do we want it? We can RejectOverload if we don't have the
bandwidth...
boolean isSSK = key instanceof NodeSSK;
- node.lockUID(uid, isSSK, false, true, false);
+ OfferReplyTag tag = new OfferReplyTag(isSSK);
+ node.lockUID(uid, isSSK, false, true, false, tag);
boolean needPubKey;
try {
needPubKey = m.getBoolean(DMT.NEED_PUB_KEY);
@@ -275,22 +276,22 @@
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) data
request from "+source.getPeer()+": "+e);
}
- node.unlockUID(uid, isSSK, false, false, true, false);
+ node.unlockUID(uid, isSSK, false, false, true, false,
tag);
return true;
}
} catch (Error e) {
- node.unlockUID(uid, isSSK, false, false, true, false);
+ node.unlockUID(uid, isSSK, false, false, true, false,
tag);
throw e;
} catch (RuntimeException e) {
- node.unlockUID(uid, isSSK, false, false, true, false);
+ node.unlockUID(uid, isSSK, false, false, true, false,
tag);
throw e;
} // Otherwise, sendOfferedKey is responsible for unlocking.
// Accept it.
try {
- node.failureTable.sendOfferedKey(key, isSSK,
needPubKey, uid, source);
+ node.failureTable.sendOfferedKey(key, isSSK,
needPubKey, uid, source, tag);
} catch (NotConnectedException e) {
// Too bad.
}
@@ -354,7 +355,8 @@
}
short htl = m.getShort(DMT.HTL);
Key key = (Key) m.getObject(DMT.FREENET_ROUTING_KEY);
- if(!node.lockUID(id, isSSK, false, false, false)) {
+ final RequestTag tag = new RequestTag(isSSK, RequestTag.START.REMOTE);
+ if(!node.lockUID(id, isSSK, false, false, false, tag)) {
if(logMINOR) Logger.minor(this, "Could not lock ID
"+id+" -> rejecting (already running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
@@ -377,7 +379,8 @@
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload) data
request from "+source.getPeer()+": "+e);
}
- node.unlockUID(id, isSSK, false, false, false, false);
+ tag.setRejected();
+ node.unlockUID(id, isSSK, false, false, false, false,
tag);
// Do not tell failure table.
// Otherwise an attacker can flood us with requests
very cheaply and purge our
// failure table even though we didn't accept any of
them.
@@ -385,7 +388,7 @@
}
nodeStats.reportIncomingRequestLocation(key.toNormalizedDouble());
//if(!node.lockUID(id)) return false;
- RequestHandler rh = new RequestHandler(m, source, id, node,
htl, key);
+ RequestHandler rh = new RequestHandler(m, source, id, node,
htl, key, tag);
node.executor.execute(rh, "RequestHandler for UID "+id+" on
"+node.getDarknetPortNumber());
return true;
}
@@ -402,7 +405,8 @@
}
return true;
}
- if(!node.lockUID(id, isSSK, true, false, false)) {
+ InsertTag tag = new InsertTag(isSSK, InsertTag.START.REMOTE);
+ if(!node.lockUID(id, isSSK, true, false, false, tag)) {
if(logMINOR) Logger.minor(this, "Could not lock ID
"+id+" -> rejecting (already running)");
Message rejected = DMT.createFNPRejectedLoop(id);
try {
@@ -422,7 +426,7 @@
} catch (NotConnectedException e) {
Logger.normal(this, "Rejecting (overload)
insert request from "+source.getPeer()+": "+e);
}
- node.unlockUID(id, isSSK, true, false, false, false);
+ node.unlockUID(id, isSSK, true, false, false, false,
tag);
return true;
}
long now = System.currentTimeMillis();
@@ -431,17 +435,17 @@
byte[] data = ((ShortBuffer) m.getObject(DMT.DATA)).getData();
byte[] headers = ((ShortBuffer)
m.getObject(DMT.BLOCK_HEADERS)).getData();
short htl = m.getShort(DMT.HTL);
- SSKInsertHandler rh = new SSKInsertHandler(key, data,
headers, htl, source, id, node, now);
+ SSKInsertHandler rh = new SSKInsertHandler(key, data,
headers, htl, source, id, node, now, tag);
rh.receivedBytes(m.receivedByteCount());
node.executor.execute(rh, "SSKInsertHandler for "+id+"
on "+node.getDarknetPortNumber());
} else if(m.getSpec().equals(DMT.FNPSSKInsertRequestNew)) {
NodeSSK key = (NodeSSK)
m.getObject(DMT.FREENET_ROUTING_KEY);
short htl = m.getShort(DMT.HTL);
- SSKInsertHandler rh = new SSKInsertHandler(key, null,
null, htl, source, id, node, now);
+ SSKInsertHandler rh = new SSKInsertHandler(key, null,
null, htl, source, id, node, now, tag);
rh.receivedBytes(m.receivedByteCount());
node.executor.execute(rh, "SSKInsertHandler for "+id+"
on "+node.getDarknetPortNumber());
} else {
- CHKInsertHandler rh = new CHKInsertHandler(m, source,
id, node, now);
+ CHKInsertHandler rh = new CHKInsertHandler(m, source,
id, node, now, tag);
node.executor.execute(rh, "CHKInsertHandler for "+id+"
on "+node.getDarknetPortNumber());
}
if(logMINOR) Logger.minor(this, "Started InsertHandler for
"+id);
Added: trunk/freenet/src/freenet/node/OfferReplyTag.java
===================================================================
--- trunk/freenet/src/freenet/node/OfferReplyTag.java
(rev 0)
+++ trunk/freenet/src/freenet/node/OfferReplyTag.java 2009-01-09 15:01:46 UTC
(rev 24981)
@@ -0,0 +1,27 @@
+package freenet.node;
+
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+/**
+ * Tag tracking an offer reply.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class OfferReplyTag extends UIDTag {
+
+ final boolean ssk;
+
+ public OfferReplyTag(boolean isSSK) {
+ super();
+ ssk = isSSK;
+ }
+
+ @Override
+ public void logStillPresent(Long uid) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Still present after
").append(TimeUtil.formatTime(age()));
+ sb.append(" : ssk=").append(ssk);
+ Logger.error(this, sb.toString());
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2009-01-08 22:08:28 UTC
(rev 24980)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2009-01-09 15:01:46 UTC
(rev 24981)
@@ -49,18 +49,20 @@
private long searchStartTime;
private long responseDeadline;
private BlockTransmitter bt;
+ private final RequestTag tag;
@Override
public String toString() {
return super.toString() + " for " + uid;
}
- public RequestHandler(Message m, PeerNode source, long id, Node n,
short htl, Key key) {
+ public RequestHandler(Message m, PeerNode source, long id, Node n,
short htl, Key key, RequestTag tag) {
req = m;
node = n;
uid = id;
this.source = source;
this.htl = htl;
+ this.tag = tag;
if(htl <= 0)
htl = 1;
this.key = key;
@@ -78,11 +80,13 @@
} catch(NotConnectedException e) {
Logger.normal(this, "requestor gone, could not start
request handler wait");
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false,
false, false, false);
+ tag.handlerThrew(e);
+ node.unlockUID(uid, key instanceof NodeSSK, false,
false, false, false, tag);
} catch(Throwable t) {
Logger.error(this, "Caught " + t, t);
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false,
false, false, false);
+ tag.handlerThrew(t);
+ node.unlockUID(uid, key instanceof NodeSSK, false,
false, false, false, tag);
}
}
private Exception previousApplyByteCountCall;
@@ -140,6 +144,7 @@
Object o = node.makeRequestSender(key, htl, uid, source, false,
true, false, false);
if(o instanceof KeyBlock) {
+ tag.setServedFromDatastore();
returnLocalData((KeyBlock) o);
return;
}
@@ -436,7 +441,7 @@
private void unregisterRequestHandlerWithNode() {
node.removeTransferringRequestHandler(uid);
- node.unlockUID(uid, key instanceof NodeSSK, false, false,
false, false);
+ node.unlockUID(uid, key instanceof NodeSSK, false, false,
false, false, tag);
}
/**
Added: trunk/freenet/src/freenet/node/RequestTag.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestTag.java
(rev 0)
+++ trunk/freenet/src/freenet/node/RequestTag.java 2009-01-09 15:01:46 UTC
(rev 24981)
@@ -0,0 +1,78 @@
+package freenet.node;
+
+import java.lang.ref.WeakReference;
+
+import freenet.io.comm.NotConnectedException;
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+/**
+ * Tag for a request.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class RequestTag extends UIDTag {
+
+ enum START {
+ ASYNC_GET,
+ LOCAL,
+ REMOTE
+ }
+
+ final START start;
+ final boolean isSSK;
+ boolean servedFromDatastore;
+ WeakReference<RequestSender> sender;
+ int requestSenderFinishedCode;
+ Throwable handlerThrew;
+ boolean rejected;
+
+ public RequestTag(boolean isSSK, START start) {
+ super();
+ this.start = start;
+ this.isSSK = isSSK;
+ }
+
+ public void setRequestSenderFinished(int status) {
+ requestSenderFinishedCode = status;
+ }
+
+ public void setSender(RequestSender rs) {
+ sender = new WeakReference<RequestSender>(rs);
+ }
+
+ public void handlerThrew(Throwable t) {
+ this.handlerThrew = t;
+ }
+
+ public void setServedFromDatastore() {
+ servedFromDatastore = true;
+ }
+
+ public void setRejected() {
+ rejected = true;
+ }
+
+ @Override
+ public void logStillPresent(Long uid) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Still present after
").append(TimeUtil.formatTime(age()));
+ sb.append(" : ").append(uid).append(" : start=").append(start);
+ sb.append(" ssk=").append(isSSK).append(" from
store=").append(servedFromDatastore);
+ RequestSender s = sender.get();
+ if(s == null) {
+ sb.append(" sender=null");
+ } else {
+ sb.append(" sender=").append(s);
+ sb.append(" status=");
+ sb.append(s.getStatusString());
+ }
+ sb.append(" finishedCode=").append(requestSenderFinishedCode);
+ sb.append(" rejected=").append(rejected);
+ sb.append(" thrown=").append(handlerThrew);
+ if(handlerThrew != null)
+ Logger.error(this, sb.toString(), handlerThrew);
+ else
+ Logger.error(this, sb.toString());
+ }
+
+}
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2009-01-08
22:08:28 UTC (rev 24980)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2009-01-09
15:01:46 UTC (rev 24981)
@@ -45,8 +45,9 @@
private byte[] data;
private byte[] headers;
private boolean canCommit;
+ final InsertTag tag;
- SSKInsertHandler(NodeSSK key, byte[] data, byte[] headers, short htl,
PeerNode source, long id, Node node, long startTime) {
+ SSKInsertHandler(NodeSSK key, byte[] data, byte[] headers, short htl,
PeerNode source, long id, Node node, long startTime, InsertTag tag) {
this.node = node;
this.uid = id;
this.source = source;
@@ -55,6 +56,7 @@
this.htl = htl;
this.data = data;
this.headers = headers;
+ this.tag = tag;
if(htl <= 0) htl = 1;
byte[] pubKeyHash = key.getPubKeyHash();
pubKey = node.getKey(pubKeyHash);
@@ -77,7 +79,7 @@
Logger.error(this, "Caught "+t, t);
} finally {
if(logMINOR) Logger.minor(this, "Exiting InsertHandler.run() for
"+uid);
- node.unlockUID(uid, true, true, false, false, false);
+ node.unlockUID(uid, true, true, false, false, false, tag);
}
}
Added: trunk/freenet/src/freenet/node/UIDTag.java
===================================================================
--- trunk/freenet/src/freenet/node/UIDTag.java (rev 0)
+++ trunk/freenet/src/freenet/node/UIDTag.java 2009-01-09 15:01:46 UTC (rev
24981)
@@ -0,0 +1,23 @@
+package freenet.node;
+
+/**
+ * Base class for tags representing a running request. These store enough
information
+ * to detect whether they are finished; if they are still in the list, this
normally
+ * represents a bug.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public abstract class UIDTag {
+
+ final long createdTime;
+
+ UIDTag() {
+ createdTime = System.currentTimeMillis();
+ }
+
+ public abstract void logStillPresent(Long uid);
+
+ long age() {
+ return System.currentTimeMillis() - createdTime;
+ }
+
+}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs