Author: toad
Date: 2009-01-09 15:01:46 +0000 (Fri, 09 Jan 2009)
New Revision: 24981

Added:
   trunk/freenet/src/freenet/node/InsertTag.java
   trunk/freenet/src/freenet/node/OfferReplyTag.java
   trunk/freenet/src/freenet/node/RequestTag.java
   trunk/freenet/src/freenet/node/UIDTag.java
Modified:
   trunk/freenet/src/freenet/node/CHKInsertHandler.java
   trunk/freenet/src/freenet/node/FailureTable.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeClientCore.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
Log:
Debug and maybe fix the bandwidth collapse bug. Record info on a UIDTag for 
each request about its status. If it takes more than 10 minutes to complete, 
remove it and log the info as an error.


Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java        2009-01-08 
22:08:28 UTC (rev 24980)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java        2009-01-09 
15:01:46 UTC (rev 24981)
@@ -45,14 +45,16 @@
     private BlockReceiver br;
     private Thread runThread;
     PartiallyReceivedBlock prb;
+    final InsertTag tag;
     private static boolean logMINOR;
     
-    CHKInsertHandler(Message req, PeerNode source, long id, Node node, long 
startTime) {
+    CHKInsertHandler(Message req, PeerNode source, long id, Node node, long 
startTime, InsertTag tag) {
         this.req = req;
         this.node = node;
         this.uid = id;
         this.source = source;
         this.startTime = startTime;
+        this.tag = tag;
         key = (NodeCHK) req.getObject(DMT.FREENET_ROUTING_KEY);
         htl = req.getShort(DMT.HTL);
         if(htl <= 0) htl = 1;
@@ -71,11 +73,13 @@
                realRun();
                } catch (OutOfMemoryError e) {
                        OOMHandler.handleOOM(e);
+                       tag.handlerThrew(e);
         } catch (Throwable t) {
             Logger.error(this, "Caught in run() "+t, t);
+            tag.handlerThrew(t);
         } finally {
                if(logMINOR) Logger.minor(this, "Exiting CHKInsertHandler.run() 
for "+uid);
-            node.unlockUID(uid, false, true, false, false, false);
+            node.unlockUID(uid, false, true, false, false, false, tag);
         }
     }
 

Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java    2009-01-08 22:08:28 UTC 
(rev 24980)
+++ trunk/freenet/src/freenet/node/FailureTable.java    2009-01-09 15:01:46 UTC 
(rev 24981)
@@ -379,16 +379,16 @@
         * @param source The node that asked for the key.
         * @throws NotConnectedException If the sender ceases to be connected.
         */
-       public void sendOfferedKey(final Key key, final boolean isSSK, final 
boolean needPubKey, final long uid, final PeerNode source) throws 
NotConnectedException {
+       public void sendOfferedKey(final Key key, final boolean isSSK, final 
boolean needPubKey, final long uid, final PeerNode source, final OfferReplyTag 
tag) throws NotConnectedException {
                this.offerExecutor.execute(new Runnable() {
                        public void run() {
                                try {
-                                       innerSendOfferedKey(key, isSSK, 
needPubKey, uid, source);
+                                       innerSendOfferedKey(key, isSSK, 
needPubKey, uid, source, tag);
                                } catch (NotConnectedException e) {
-                                       node.unlockUID(uid, isSSK, false, 
false, true, false);
+                                       node.unlockUID(uid, isSSK, false, 
false, true, false, tag);
                                        // Too bad.
                                } catch (Throwable t) {
-                                       node.unlockUID(uid, isSSK, false, 
false, true, false);
+                                       node.unlockUID(uid, isSSK, false, 
false, true, false, tag);
                                        Logger.error(this, "Caught "+t+" 
sending offered key");
                                }
                        }
@@ -400,13 +400,13 @@
         * on a separate thread. However, blocking disk I/O *should happen on 
this thread*. We deliberately
         * serialise it, as high latencies can otherwise result.
         */
-       protected void innerSendOfferedKey(Key key, final boolean isSSK, 
boolean needPubKey, final long uid, final PeerNode source) throws 
NotConnectedException {
+       protected void innerSendOfferedKey(Key key, final boolean isSSK, 
boolean needPubKey, final long uid, final PeerNode source, final OfferReplyTag 
tag) throws NotConnectedException {
                if(isSSK) {
                        SSKBlock block = node.fetch((NodeSSK)key, false);
                        if(block == null) {
                                // Don't have the key
                                
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
-                               node.unlockUID(uid, isSSK, false, false, true, 
false);
+                               node.unlockUID(uid, isSSK, false, false, true, 
false, tag);
                                return;
                        }
                        
@@ -433,7 +433,7 @@
                                        } catch (SyncSendWaitedTooLongException 
e) {
                                                // Impossible
                                        } finally {
-                                               node.unlockUID(uid, isSSK, 
false, false, true, false);
+                                               node.unlockUID(uid, isSSK, 
false, false, true, false, tag);
                                        }
                                }
                                
@@ -452,7 +452,7 @@
                        if(block == null) {
                                // Don't have the key
                                
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
-                               node.unlockUID(uid, isSSK, false, false, true, 
false);
+                               node.unlockUID(uid, isSSK, false, false, true, 
false, tag);
                                return;
                        }
                        Message df = DMT.createFNPCHKDataFound(uid, 
block.getRawHeaders());
@@ -473,7 +473,7 @@
                                        } catch (Throwable t) {
                                                Logger.error(this, "Sending 
offered key failed: "+t, t);
                                        } finally {
-                                               node.unlockUID(uid, isSSK, 
false, false, true, false);
+                                               node.unlockUID(uid, isSSK, 
false, false, true, false, tag);
                                        }
                                }
                        

Added: trunk/freenet/src/freenet/node/InsertTag.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertTag.java                               
(rev 0)
+++ trunk/freenet/src/freenet/node/InsertTag.java       2009-01-09 15:01:46 UTC 
(rev 24981)
@@ -0,0 +1,45 @@
+package freenet.node;
+
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+/**
+ * Represents an insert.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class InsertTag extends UIDTag {
+       
+       final boolean ssk;
+       
+       enum START {
+               LOCAL,
+               REMOTE
+       }
+       
+       START start;
+       private Throwable handlerThrew;
+       
+       InsertTag(boolean ssk, START start) {
+               super();
+               this.start = start;
+               this.ssk = ssk;
+       }
+
+       public void handlerThrew(Throwable t) {
+               handlerThrew = t;
+       }
+
+       @Override
+       public void logStillPresent(Long uid) {
+               StringBuffer sb = new StringBuffer();
+               sb.append("Still present after 
").append(TimeUtil.formatTime(age()));
+               sb.append(" : ").append(uid).append(" : start=").append(start);
+               sb.append(" ssk=").append(ssk);
+               sb.append(" thrown=").append(handlerThrew);
+               if(handlerThrew != null)
+                       Logger.error(this, sb.toString(), handlerThrew);
+               else
+                       Logger.error(this, sb.toString());
+       }
+
+}

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2009-01-08 22:08:28 UTC (rev 
24980)
+++ trunk/freenet/src/freenet/node/Node.java    2009-01-09 15:01:46 UTC (rev 
24981)
@@ -365,17 +365,17 @@
        public boolean disableHangCheckers;
        
        /** HashSet of currently running request UIDs */
-       private final HashSet<Long> runningUIDs;
-       private final HashSet<Long> runningCHKGetUIDs;
-       private final HashSet<Long> runningLocalCHKGetUIDs;
-       private final HashSet<Long> runningSSKGetUIDs;
-       private final HashSet<Long> runningLocalSSKGetUIDs;
-       private final HashSet<Long> runningCHKPutUIDs;
-       private final HashSet<Long> runningLocalCHKPutUIDs;
-       private final HashSet<Long> runningSSKPutUIDs;
-       private final HashSet<Long> runningLocalSSKPutUIDs;
-       private final HashSet<Long> runningCHKOfferReplyUIDs;
-       private final HashSet<Long> runningSSKOfferReplyUIDs;
+       private final HashMap<Long,UIDTag> runningUIDs;
+       private final HashMap<Long,RequestTag> runningCHKGetUIDs;
+       private final HashMap<Long,RequestTag> runningLocalCHKGetUIDs;
+       private final HashMap<Long,RequestTag> runningSSKGetUIDs;
+       private final HashMap<Long,RequestTag> runningLocalSSKGetUIDs;
+       private final HashMap<Long,InsertTag> runningCHKPutUIDs;
+       private final HashMap<Long,InsertTag> runningLocalCHKPutUIDs;
+       private final HashMap<Long,InsertTag> runningSSKPutUIDs;
+       private final HashMap<Long,InsertTag> runningLocalSSKPutUIDs;
+       private final HashMap<Long,OfferReplyTag> runningCHKOfferReplyUIDs;
+       private final HashMap<Long,OfferReplyTag> runningSSKOfferReplyUIDs;
        
        /** Semi-unique ID for swap requests. Used to identify us so that the
         * topology can be reconstructed. */
@@ -856,17 +856,17 @@
                transferringRequestSenders = new HashMap<NodeCHK, 
RequestSender>();
                transferringRequestHandlers = new HashSet<Long>();
                insertSenders = new HashMap<KeyHTLPair, AnyInsertSender>();
-               runningUIDs = new HashSet<Long>();
-               runningCHKGetUIDs = new HashSet<Long>();
-               runningLocalCHKGetUIDs = new HashSet<Long>();
-               runningSSKGetUIDs = new HashSet<Long>();
-               runningLocalSSKGetUIDs = new HashSet<Long>();
-               runningCHKPutUIDs = new HashSet<Long>();
-               runningLocalCHKPutUIDs = new HashSet<Long>();
-               runningSSKPutUIDs = new HashSet<Long>();
-               runningLocalSSKPutUIDs = new HashSet<Long>();
-               runningCHKOfferReplyUIDs = new HashSet<Long>();
-               runningSSKOfferReplyUIDs = new HashSet<Long>();
+               runningUIDs = new HashMap<Long,UIDTag>();
+               runningCHKGetUIDs = new HashMap<Long,RequestTag>();
+               runningLocalCHKGetUIDs = new HashMap<Long,RequestTag>();
+               runningSSKGetUIDs = new HashMap<Long,RequestTag>();
+               runningLocalSSKGetUIDs = new HashMap<Long,RequestTag>();
+               runningCHKPutUIDs = new HashMap<Long,InsertTag>();
+               runningLocalCHKPutUIDs = new HashMap<Long,InsertTag>();
+               runningSSKPutUIDs = new HashMap<Long,InsertTag>();
+               runningLocalSSKPutUIDs = new HashMap<Long,InsertTag>();
+               runningCHKOfferReplyUIDs = new HashMap<Long,OfferReplyTag>();
+               runningSSKOfferReplyUIDs = new HashMap<Long,OfferReplyTag>();
                
                this.securityLevels = new SecurityLevels(this, config);
                
@@ -2274,6 +2274,8 @@
                
                this.clientCore.start(config);
                
+               startDeadUIDChecker();
+               
                // After everything has been created, write the config file 
back to disk.
                if(config instanceof FreenetFilePersistentConfig) {
                        FreenetFilePersistentConfig cfg = 
(FreenetFilePersistentConfig) config;
@@ -2941,55 +2943,137 @@
                return is;
        }
        
-       public boolean lockUID(long uid, boolean ssk, boolean insert, boolean 
offerReply, boolean local) {
+       public boolean lockUID(long uid, boolean ssk, boolean insert, boolean 
offerReply, boolean local, UIDTag tag) {
                synchronized(runningUIDs) {
-                       if(!runningUIDs.add(uid)) {
-                               // Already present.
-                               return false;
-                       }
+                       if(runningUIDs.containsKey(uid)) return false; // 
Already present.
+                       runningUIDs.put(uid, tag);
                }
                // If these are switched around, we must remember to remove 
from both.
-               HashSet<Long> set = getUIDTracker(ssk, insert, offerReply, 
local);
-               synchronized(set) {
-                       if(logMINOR) Logger.minor(this, "Locking "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+" 
size="+set.size());
-                       set.add(uid);
-                       if(logMINOR) Logger.minor(this, "Locked "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+" 
size="+set.size());
+               if(offerReply) {
+                       HashMap<Long,OfferReplyTag> map = getOfferTracker(ssk);
+                       innerLock(map, (OfferReplyTag)tag, uid, ssk, insert, 
offerReply, local);
+               } else if(insert) {
+                       HashMap<Long,InsertTag> map = 
getInsertTracker(ssk,local);
+                       innerLock(map, (InsertTag)tag, uid, ssk, insert, 
offerReply, local);
+               } else {
+                       HashMap<Long,RequestTag> map = 
getRequestTracker(ssk,local);
+                       innerLock(map, (RequestTag)tag, uid, ssk, insert, 
offerReply, local);
                }
                return true;
        }
        
-       public void unlockUID(long uid, boolean ssk, boolean insert, boolean 
canFail, boolean offerReply, boolean local) {
+       private<T extends UIDTag> void innerLock(HashMap<Long, T> map, T tag, 
Long uid, boolean ssk, boolean insert, boolean offerReply, boolean local) {
+               synchronized(map) {
+                       if(logMINOR) Logger.minor(this, "Locking "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+" 
size="+map.size());
+                       if(map.containsKey(uid)) {
+                               Logger.error(this, "Already have UID in 
specific map ("+ssk+","+insert+","+offerReply+","+local+") but not in general 
map: trying to register "+tag+" but already have "+map.get(uid));
+                       }
+                       map.put(uid, tag);
+                       if(logMINOR) Logger.minor(this, "Locked "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+" 
size="+map.size());
+               }
+       }
+
+       public void unlockUID(long uid, boolean ssk, boolean insert, boolean 
canFail, boolean offerReply, boolean local, UIDTag tag) {
                completed(uid);
-               HashSet<Long> set = getUIDTracker(ssk, insert, offerReply, 
local);
-               synchronized(set) {
-                       if(logMINOR) Logger.minor(this, "Unlocking "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+", local="+local+" 
size="+set.size());
-                       set.remove(uid);
-                       if(logMINOR) Logger.minor(this, "Unlocked "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+", local="+local+" 
size="+set.size());
+               
+               if(offerReply) {
+                       HashMap<Long,OfferReplyTag> map = getOfferTracker(ssk);
+                       innerUnlock(map, (OfferReplyTag)tag, uid, ssk, insert, 
offerReply, local);
+               } else if(insert) {
+                       HashMap<Long,InsertTag> map = 
getInsertTracker(ssk,local);
+                       innerUnlock(map, (InsertTag)tag, uid, ssk, insert, 
offerReply, local);
+               } else {
+                       HashMap<Long,RequestTag> map = 
getRequestTracker(ssk,local);
+                       innerUnlock(map, (RequestTag)tag, uid, ssk, insert, 
offerReply, local);
                }
+               
                synchronized(runningUIDs) {
-                       if(!runningUIDs.remove(uid) && !canFail)
+                       UIDTag oldTag = runningUIDs.get(uid);
+                       if(oldTag == null) {
                                throw new IllegalStateException("Could not 
unlock "+uid+ '!');
+                       } else if(tag != oldTag) {
+                               Logger.error(this, "Removing "+tag+" for 
"+uid+" but "+tag+" is registered!");
+                               return;
+                       } else {
+                               runningUIDs.remove(uid);
+                       }
                }
        }
 
-       private HashSet<Long> getUIDTracker(boolean ssk, boolean insert, 
boolean offerReply, boolean local) {
+       private<T extends UIDTag> void innerUnlock(HashMap<Long, T> map, T tag, 
Long uid, boolean ssk, boolean insert, boolean offerReply, boolean local) {
+               synchronized(map) {
+                       if(logMINOR) Logger.minor(this, "Locking "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+" 
size="+map.size());
+                       if(map.get(uid) != tag)
+                               Logger.error(this, "Removing "+tag+" for 
"+uid+" returned "+map.get(uid));
+                       else
+                               map.remove(uid);
+                       if(logMINOR) Logger.minor(this, "Locked "+uid+" 
ssk="+ssk+" insert="+insert+" offerReply="+offerReply+" local="+local+" 
size="+map.size());
+               }
+       }
+
+       private HashMap<Long, RequestTag> getRequestTracker(boolean ssk, 
boolean local) {
                if(ssk) {
-                       if(offerReply)
-                               return runningSSKOfferReplyUIDs;
-                       if(!local)
-                               return insert ? runningSSKPutUIDs : 
runningSSKGetUIDs;
-                       else
-                               return insert ? runningLocalSSKPutUIDs : 
runningLocalSSKGetUIDs;
+                       return local ? runningLocalSSKGetUIDs : 
runningSSKGetUIDs;
                } else {
-                       if(offerReply)
-                               return runningCHKOfferReplyUIDs;
-                       if(!local)
-                               return insert ? runningCHKPutUIDs : 
runningCHKGetUIDs;
-                       else
-                               return insert ? runningLocalCHKPutUIDs : 
runningLocalCHKGetUIDs;
+                       return local ? runningLocalCHKGetUIDs : 
runningCHKGetUIDs;
                }
        }
+
+       private HashMap<Long, InsertTag> getInsertTracker(boolean ssk, boolean 
local) {
+               if(ssk) {
+                       return local ? runningLocalSSKPutUIDs : 
runningSSKPutUIDs;
+               } else {
+                       return local ? runningLocalCHKPutUIDs : 
runningCHKPutUIDs;
+               }
+       }
+
+       private HashMap<Long, OfferReplyTag> getOfferTracker(boolean ssk) {
+               return ssk ? runningSSKOfferReplyUIDs : 
runningCHKOfferReplyUIDs;
+       }
+
+       private void startDeadUIDChecker() {
+               // TODO Auto-generated method stub
+               
+       }
+
+       private Runnable deadUIDChecker = new Runnable() {
+               public void run() {
+                       try {
+                               checkUIDs(runningLocalSSKGetUIDs);
+                               checkUIDs(runningLocalCHKGetUIDs);
+                               checkUIDs(runningLocalSSKPutUIDs);
+                               checkUIDs(runningLocalCHKPutUIDs);
+                               checkUIDs(runningSSKGetUIDs);
+                               checkUIDs(runningCHKGetUIDs);
+                               checkUIDs(runningSSKPutUIDs);
+                               checkUIDs(runningCHKPutUIDs);
+                               checkUIDs(runningSSKOfferReplyUIDs);
+                               checkUIDs(runningCHKOfferReplyUIDs);
+                       } finally {
+                               getTicker().queueTimedJob(this, 60*1000);
+                       }
+               }
+
+               private void checkUIDs(HashMap<Long, ? extends UIDTag> map) {
+                       Long[] uids;
+                       UIDTag[] tags;
+                       synchronized(map) {
+                               uids = map.keySet().toArray(new 
Long[map.size()]);
+                               tags = map.values().toArray(new 
UIDTag[map.size()]);
+                       }
+                       long now = System.currentTimeMillis();
+                       for(int i=0;i<uids.length;i++) {
+                               if(now - tags[i].createdTime > 10 * 60 * 1000) {
+                                       tags[i].logStillPresent(uids[i]);
+                                       synchronized(map) {
+                                               map.remove(uids[i]);
+                                       }
+                               }
+                       }
+               }
+       };
        
+       
        /**
         * @return Some status information.
         */
@@ -3915,7 +3999,7 @@
 
        public void addRunningUIDs(Vector<Long> list) {
                synchronized(runningUIDs) {
-                       list.addAll(runningUIDs);
+                       list.addAll(runningUIDs.keySet());
                }
        }
        

Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java  2009-01-08 22:08:28 UTC 
(rev 24980)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java  2009-01-09 15:01:46 UTC 
(rev 24981)
@@ -534,7 +534,8 @@
        public void asyncGet(Key key, boolean cache, boolean offersOnly, final 
SimpleRequestSenderCompletionListener listener) {
                final long uid = random.nextLong();
                final boolean isSSK = key instanceof NodeSSK;
-               if(!node.lockUID(uid, isSSK, false, false, true)) {
+               final RequestTag tag = new RequestTag(isSSK, 
RequestTag.START.ASYNC_GET);
+               if(!node.lockUID(uid, isSSK, false, false, true, tag)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: " + uid + " - probably indicates broken PRNG");
                        return;
                }
@@ -550,11 +551,12 @@
 
                        public void onRequestSenderFinished(int status) {
                                // If transfer coalescing has happened, we may 
have already unlocked.
-                               node.unlockUID(uid, isSSK, false, true, false, 
true);
+                               node.unlockUID(uid, isSSK, false, true, false, 
true, tag);
+                               tag.setRequestSenderFinished(status);
                                if(listener != null)
                                        listener.completed(status == 
RequestSender.SUCCESS);
                        }
-               });
+               }, tag);
        }
 
        /**
@@ -563,26 +565,28 @@
         * anything and will run asynchronously. Caller is responsible for 
unlocking the UID.
         * @param key
         */
-       void asyncGet(Key key, boolean cache, boolean offersOnly, long uid, 
RequestSender.Listener listener) {
+       void asyncGet(Key key, boolean cache, boolean offersOnly, long uid, 
RequestSender.Listener listener, RequestTag tag) {
                try {
                        Object o = node.makeRequestSender(key, node.maxHTL(), 
uid, null, false, cache, false, offersOnly);
                        if(o instanceof KeyBlock) {
-                               node.unlockUID(uid, false, false, true, false, 
true);
+                               tag.servedFromDatastore = true;
+                               node.unlockUID(uid, false, false, true, false, 
true, tag);
                                return; // Already have it.
                        }
                        RequestSender rs = (RequestSender) o;
+                       tag.setSender(rs);
                        rs.addListener(listener);
                        if(rs.uid != uid)
-                               node.unlockUID(uid, false, false, false, false, 
true);
+                               node.unlockUID(uid, false, false, false, false, 
true, tag);
                        // Else it has started a request.
                        if(logMINOR)
                                Logger.minor(this, "Started " + o + " for " + 
uid + " for " + key);
                } catch(RuntimeException e) {
                        Logger.error(this, "Caught error trying to start 
request: " + e, e);
-                       node.unlockUID(uid, false, false, true, false, true);
+                       node.unlockUID(uid, false, false, true, false, true, 
tag);
                } catch(Error e) {
                        Logger.error(this, "Caught error trying to start 
request: " + e, e);
-                       node.unlockUID(uid, false, false, true, false, true);
+                       node.unlockUID(uid, false, false, true, false, true, 
tag);
                }
        }
 
@@ -599,7 +603,8 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                long startTime = System.currentTimeMillis();
                long uid = random.nextLong();
-               if(!node.lockUID(uid, false, false, false, true)) {
+               RequestTag tag = new RequestTag(false, RequestTag.START.LOCAL);
+               if(!node.lockUID(uid, false, false, false, true, tag)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: " + uid + " - probably indicates broken PRNG");
                        throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
                }
@@ -607,6 +612,7 @@
                        Object o = node.makeRequestSender(key.getNodeCHK(), 
node.maxHTL(), uid, null, localOnly, cache, ignoreStore, false);
                        if(o instanceof CHKBlock)
                                try {
+                                       tag.setServedFromDatastore();
                                        return new ClientCHKBlock((CHKBlock) o, 
key);
                                } catch(CHKVerifyException e) {
                                        Logger.error(this, "Does not verify: " 
+ e, e);
@@ -703,7 +709,7 @@
                                }
                        }
                } finally {
-                       node.unlockUID(uid, false, false, true, false, true);
+                       node.unlockUID(uid, false, false, true, false, true, 
tag);
                }
        }
 
@@ -711,7 +717,8 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                long startTime = System.currentTimeMillis();
                long uid = random.nextLong();
-               if(!node.lockUID(uid, true, false, false, true)) {
+               RequestTag tag = new RequestTag(true, RequestTag.START.LOCAL);
+               if(!node.lockUID(uid, true, false, false, true, tag)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: " + uid + " - probably indicates broken PRNG");
                        throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
                }
@@ -719,6 +726,7 @@
                        Object o = node.makeRequestSender(key.getNodeKey(), 
node.maxHTL(), uid, null, localOnly, cache, ignoreStore, false);
                        if(o instanceof SSKBlock)
                                try {
+                                       tag.setServedFromDatastore();
                                        SSKBlock block = (SSKBlock) o;
                                        key.setPublicKey(block.getPubKey());
                                        return ClientSSKBlock.construct(block, 
key);
@@ -814,7 +822,7 @@
                                        }
                        }
                } finally {
-                       node.unlockUID(uid, true, false, true, false, true);
+                       node.unlockUID(uid, true, false, true, false, true, 
tag);
                }
        }
 
@@ -834,7 +842,8 @@
                PartiallyReceivedBlock prb = new 
PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, data);
                CHKInsertSender is;
                long uid = random.nextLong();
-               if(!node.lockUID(uid, false, true, false, true)) {
+               InsertTag tag = new InsertTag(false, InsertTag.START.LOCAL);
+               if(!node.lockUID(uid, false, true, false, true, tag)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: " + uid + " - probably indicates broken PRNG");
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                }
@@ -940,7 +949,7 @@
                                }
                        }
                } finally {
-                       node.unlockUID(uid, false, true, true, false, true);
+                       node.unlockUID(uid, false, true, true, false, true, 
tag);
                }
        }
 
@@ -948,7 +957,8 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                SSKInsertSender is;
                long uid = random.nextLong();
-               if(!node.lockUID(uid, true, true, false, true)) {
+               InsertTag tag = new InsertTag(true, InsertTag.START.LOCAL);
+               if(!node.lockUID(uid, true, true, false, true, tag)) {
                        Logger.error(this, "Could not lock UID just randomly 
generated: " + uid + " - probably indicates broken PRNG");
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                }
@@ -1066,7 +1076,7 @@
                                }
                        }
                } finally {
-                       node.unlockUID(uid, true, true, true, false, true);
+                       node.unlockUID(uid, true, true, true, false, true, tag);
                }
        }
 

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2009-01-08 22:08:28 UTC 
(rev 24980)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2009-01-09 15:01:46 UTC 
(rev 24981)
@@ -261,7 +261,8 @@
                
                // Do we want it? We can RejectOverload if we don't have the 
bandwidth...
                boolean isSSK = key instanceof NodeSSK;
-               node.lockUID(uid, isSSK, false, true, false);
+               OfferReplyTag tag = new OfferReplyTag(isSSK);
+               node.lockUID(uid, isSSK, false, true, false, tag);
                boolean needPubKey;
                try {
                needPubKey = m.getBoolean(DMT.NEED_PUB_KEY);
@@ -275,22 +276,22 @@
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) data 
request from "+source.getPeer()+": "+e);
                        }
-                       node.unlockUID(uid, isSSK, false, false, true, false);
+                       node.unlockUID(uid, isSSK, false, false, true, false, 
tag);
                        return true;
                }
                
                } catch (Error e) {
-                       node.unlockUID(uid, isSSK, false, false, true, false);
+                       node.unlockUID(uid, isSSK, false, false, true, false, 
tag);
                        throw e;
                } catch (RuntimeException e) {
-                       node.unlockUID(uid, isSSK, false, false, true, false);
+                       node.unlockUID(uid, isSSK, false, false, true, false, 
tag);
                        throw e;
                } // Otherwise, sendOfferedKey is responsible for unlocking. 
                
                // Accept it.
                
                try {
-                       node.failureTable.sendOfferedKey(key, isSSK, 
needPubKey, uid, source);
+                       node.failureTable.sendOfferedKey(key, isSSK, 
needPubKey, uid, source, tag);
                } catch (NotConnectedException e) {
                        // Too bad.
                }
@@ -354,7 +355,8 @@
                }
         short htl = m.getShort(DMT.HTL);
         Key key = (Key) m.getObject(DMT.FREENET_ROUTING_KEY);
-               if(!node.lockUID(id, isSSK, false, false, false)) {
+        final RequestTag tag = new RequestTag(isSSK, RequestTag.START.REMOTE);
+               if(!node.lockUID(id, isSSK, false, false, false, tag)) {
                        if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
@@ -377,7 +379,8 @@
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) data 
request from "+source.getPeer()+": "+e);
                        }
-                       node.unlockUID(id, isSSK, false, false, false, false);
+                       tag.setRejected();
+                       node.unlockUID(id, isSSK, false, false, false, false, 
tag);
                        // Do not tell failure table.
                        // Otherwise an attacker can flood us with requests 
very cheaply and purge our
                        // failure table even though we didn't accept any of 
them.
@@ -385,7 +388,7 @@
                }
                
nodeStats.reportIncomingRequestLocation(key.toNormalizedDouble());
                //if(!node.lockUID(id)) return false;
-               RequestHandler rh = new RequestHandler(m, source, id, node, 
htl, key);
+               RequestHandler rh = new RequestHandler(m, source, id, node, 
htl, key, tag);
                node.executor.execute(rh, "RequestHandler for UID "+id+" on 
"+node.getDarknetPortNumber());
                return true;
        }
@@ -402,7 +405,8 @@
                        }
                        return true;
                }
-               if(!node.lockUID(id, isSSK, true, false, false)) {
+               InsertTag tag = new InsertTag(isSSK, InsertTag.START.REMOTE);
+               if(!node.lockUID(id, isSSK, true, false, false, tag)) {
                        if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
@@ -422,7 +426,7 @@
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) 
insert request from "+source.getPeer()+": "+e);
                        }
-                       node.unlockUID(id, isSSK, true, false, false, false);
+                       node.unlockUID(id, isSSK, true, false, false, false, 
tag);
                        return true;
                }
                long now = System.currentTimeMillis();
@@ -431,17 +435,17 @@
                byte[] data = ((ShortBuffer) m.getObject(DMT.DATA)).getData();
                byte[] headers = ((ShortBuffer) 
m.getObject(DMT.BLOCK_HEADERS)).getData();
                short htl = m.getShort(DMT.HTL);
-                       SSKInsertHandler rh = new SSKInsertHandler(key, data, 
headers, htl, source, id, node, now);
+                       SSKInsertHandler rh = new SSKInsertHandler(key, data, 
headers, htl, source, id, node, now, tag);
                rh.receivedBytes(m.receivedByteCount());
                        node.executor.execute(rh, "SSKInsertHandler for "+id+" 
on "+node.getDarknetPortNumber());
                } else if(m.getSpec().equals(DMT.FNPSSKInsertRequestNew)) {
                        NodeSSK key = (NodeSSK) 
m.getObject(DMT.FREENET_ROUTING_KEY);
                        short htl = m.getShort(DMT.HTL);
-                       SSKInsertHandler rh = new SSKInsertHandler(key, null, 
null, htl, source, id, node, now);
+                       SSKInsertHandler rh = new SSKInsertHandler(key, null, 
null, htl, source, id, node, now, tag);
                rh.receivedBytes(m.receivedByteCount());
                        node.executor.execute(rh, "SSKInsertHandler for "+id+" 
on "+node.getDarknetPortNumber());
                } else {
-                       CHKInsertHandler rh = new CHKInsertHandler(m, source, 
id, node, now);
+                       CHKInsertHandler rh = new CHKInsertHandler(m, source, 
id, node, now, tag);
                        node.executor.execute(rh, "CHKInsertHandler for "+id+" 
on "+node.getDarknetPortNumber());
                }
                if(logMINOR) Logger.minor(this, "Started InsertHandler for 
"+id);

Added: trunk/freenet/src/freenet/node/OfferReplyTag.java
===================================================================
--- trunk/freenet/src/freenet/node/OfferReplyTag.java                           
(rev 0)
+++ trunk/freenet/src/freenet/node/OfferReplyTag.java   2009-01-09 15:01:46 UTC 
(rev 24981)
@@ -0,0 +1,27 @@
+package freenet.node;
+
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+/**
+ * Tag tracking an offer reply.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class OfferReplyTag extends UIDTag {
+
+       final boolean ssk;
+       
+       public OfferReplyTag(boolean isSSK) {
+               super();
+               ssk = isSSK;
+       }
+
+       @Override
+       public void logStillPresent(Long uid) {
+               StringBuffer sb = new StringBuffer();
+               sb.append("Still present after 
").append(TimeUtil.formatTime(age()));
+               sb.append(" : ssk=").append(ssk);
+               Logger.error(this, sb.toString());
+       }
+
+}

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2009-01-08 22:08:28 UTC 
(rev 24980)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2009-01-09 15:01:46 UTC 
(rev 24981)
@@ -49,18 +49,20 @@
        private long searchStartTime;
        private long responseDeadline;
        private BlockTransmitter bt;
+       private final RequestTag tag;
 
        @Override
        public String toString() {
                return super.toString() + " for " + uid;
        }
 
-       public RequestHandler(Message m, PeerNode source, long id, Node n, 
short htl, Key key) {
+       public RequestHandler(Message m, PeerNode source, long id, Node n, 
short htl, Key key, RequestTag tag) {
                req = m;
                node = n;
                uid = id;
                this.source = source;
                this.htl = htl;
+               this.tag = tag;
                if(htl <= 0)
                        htl = 1;
                this.key = key;
@@ -78,11 +80,13 @@
                } catch(NotConnectedException e) {
                        Logger.normal(this, "requestor gone, could not start 
request handler wait");
                        node.removeTransferringRequestHandler(uid);
-                       node.unlockUID(uid, key instanceof NodeSSK, false, 
false, false, false);
+                       tag.handlerThrew(e);
+                       node.unlockUID(uid, key instanceof NodeSSK, false, 
false, false, false, tag);
                } catch(Throwable t) {
                        Logger.error(this, "Caught " + t, t);
                        node.removeTransferringRequestHandler(uid);
-                       node.unlockUID(uid, key instanceof NodeSSK, false, 
false, false, false);
+                       tag.handlerThrew(t);
+                       node.unlockUID(uid, key instanceof NodeSSK, false, 
false, false, false, tag);
                }
        }
        private Exception previousApplyByteCountCall;
@@ -140,6 +144,7 @@
 
                Object o = node.makeRequestSender(key, htl, uid, source, false, 
true, false, false);
                if(o instanceof KeyBlock) {
+                       tag.setServedFromDatastore();
                        returnLocalData((KeyBlock) o);
                        return;
                }
@@ -436,7 +441,7 @@
 
        private void unregisterRequestHandlerWithNode() {
                node.removeTransferringRequestHandler(uid);
-               node.unlockUID(uid, key instanceof NodeSSK, false, false, 
false, false);
+               node.unlockUID(uid, key instanceof NodeSSK, false, false, 
false, false, tag);
        }
 
        /**

Added: trunk/freenet/src/freenet/node/RequestTag.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestTag.java                              
(rev 0)
+++ trunk/freenet/src/freenet/node/RequestTag.java      2009-01-09 15:01:46 UTC 
(rev 24981)
@@ -0,0 +1,78 @@
+package freenet.node;
+
+import java.lang.ref.WeakReference;
+
+import freenet.io.comm.NotConnectedException;
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+/**
+ * Tag for a request.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public class RequestTag extends UIDTag {
+       
+       enum START {
+               ASYNC_GET,
+               LOCAL,
+               REMOTE
+       }
+       
+       final START start;
+       final boolean isSSK;
+       boolean servedFromDatastore;
+       WeakReference<RequestSender> sender;
+       int requestSenderFinishedCode;
+       Throwable handlerThrew;
+       boolean rejected;
+
+       public RequestTag(boolean isSSK, START start) {
+               super();
+               this.start = start;
+               this.isSSK = isSSK;
+       }
+
+       public void setRequestSenderFinished(int status) {
+               requestSenderFinishedCode = status;
+       }
+
+       public void setSender(RequestSender rs) {
+               sender = new WeakReference<RequestSender>(rs);
+       }
+
+       public void handlerThrew(Throwable t) {
+               this.handlerThrew = t;
+       }
+
+       public void setServedFromDatastore() {
+               servedFromDatastore = true;
+       }
+
+       public void setRejected() {
+               rejected = true;
+       }
+
+       @Override
+       public void logStillPresent(Long uid) {
+               StringBuffer sb = new StringBuffer();
+               sb.append("Still present after 
").append(TimeUtil.formatTime(age()));
+               sb.append(" : ").append(uid).append(" : start=").append(start);
+               sb.append(" ssk=").append(isSSK).append(" from 
store=").append(servedFromDatastore);
+               RequestSender s = sender.get();
+               if(s == null) {
+                       sb.append(" sender=null");
+               } else {
+                       sb.append(" sender=").append(s);
+                       sb.append(" status=");
+                       sb.append(s.getStatusString());
+               }
+               sb.append(" finishedCode=").append(requestSenderFinishedCode);
+               sb.append(" rejected=").append(rejected);
+               sb.append(" thrown=").append(handlerThrew);
+               if(handlerThrew != null)
+                       Logger.error(this, sb.toString(), handlerThrew);
+               else
+                       Logger.error(this, sb.toString());
+       }
+
+}

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2009-01-08 
22:08:28 UTC (rev 24980)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2009-01-09 
15:01:46 UTC (rev 24981)
@@ -45,8 +45,9 @@
     private byte[] data;
     private byte[] headers;
     private boolean canCommit;
+    final InsertTag tag;
     
-    SSKInsertHandler(NodeSSK key, byte[] data, byte[] headers, short htl, 
PeerNode source, long id, Node node, long startTime) {
+    SSKInsertHandler(NodeSSK key, byte[] data, byte[] headers, short htl, 
PeerNode source, long id, Node node, long startTime, InsertTag tag) {
         this.node = node;
         this.uid = id;
         this.source = source;
@@ -55,6 +56,7 @@
         this.htl = htl;
         this.data = data;
         this.headers = headers;
+        this.tag = tag;
         if(htl <= 0) htl = 1;
         byte[] pubKeyHash = key.getPubKeyHash();
         pubKey = node.getKey(pubKeyHash);
@@ -77,7 +79,7 @@
             Logger.error(this, "Caught "+t, t);
         } finally {
             if(logMINOR) Logger.minor(this, "Exiting InsertHandler.run() for 
"+uid);
-            node.unlockUID(uid, true, true, false, false, false);
+            node.unlockUID(uid, true, true, false, false, false, tag);
         }
     }
 

Added: trunk/freenet/src/freenet/node/UIDTag.java
===================================================================
--- trunk/freenet/src/freenet/node/UIDTag.java                          (rev 0)
+++ trunk/freenet/src/freenet/node/UIDTag.java  2009-01-09 15:01:46 UTC (rev 
24981)
@@ -0,0 +1,23 @@
+package freenet.node;
+
+/**
+ * Base class for tags representing a running request. These store enough 
information
+ * to detect whether they are finished; if they are still in the list, this 
normally
+ * represents a bug.
+ * @author Matthew Toseland <[email protected]> (0xE43DA450)
+ */
+public abstract class UIDTag {
+       
+       final long createdTime;
+       
+       UIDTag() {
+               createdTime = System.currentTimeMillis();
+       }
+
+       public abstract void logStillPresent(Long uid);
+
+       long age() {
+               return System.currentTimeMillis() - createdTime;
+       }
+
+}

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to