Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv20740/src/freenet

Modified Files:
        ConnectionHandler.java OpenConnectionManager.java Version.java 
Log Message:
6169:
Send DataRequests asynchronously too. We get a SendFinished back. If it's unsuccessful 
we route to the next node... Minor fixes to SendFinished handling in different states.
If we have more than 2 connections, and one of them is not sending a trailer, queue 
messages to that conn despite it being over the limits. Should limit connection open 
floods... at the expense of monstrous message send times.
Make sending QueryRejected's asynchronous (relatively obscure cases, the majority are 
already covered).
Increase messageStoreSize to 10,000 because I was getting lost non-RequestDone 
messages.
Some more work on routing termination.
Send more DataNotFound's async.
Major bugfixes in OCM.createConn.
Don't set priority of WSL/RSL to max. Looked like it might be starving stuff.
Ignore SendFinished in RequestDone.
Logging, indenting, style, etc.


Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.140
retrieving revision 1.141
diff -u -r1.140 -r1.141
--- ConnectionHandler.java      30 Aug 2003 23:16:50 -0000      1.140
+++ ConnectionHandler.java      4 Sep 2003 22:45:05 -0000       1.141
@@ -960,8 +960,17 @@
                                        
                                        byte[] b = currentSender.toSend;
                                        int l = b.length;
+                                       Core.logger.log(this, "Removed (A) message of 
size "+l+
+                                                                       " from 
sendingQueue: now "+
+                                                                       
sendingQueue.size()+" for "+this,
+                                                                       Logger.DEBUG);
+                                                                       
                                        synchronized(sendingQueueBytesLock) {
                                                sendingQueueBytes -= l;
+                                               Core.logger.log(this, "Removed (A) 
message of size "+l+
+                                                                               " from 
sendingQueue: now "+
+                                                                               
sendingQueueBytes+" bytes for "+this,
+                                                                               
Logger.DEBUG);
                                        }
                                        
                                        if(l > maxPacketLength) {
@@ -976,8 +985,18 @@
                                                        break;
                                                } else {
                                                        
sendingQueue.add(currentSender);
+                                                       Core.logger.log(this, "Added 
(B) message of size "+
+                                                                                      
 l+" to sendingQueue: now "+
+                                                                                      
 sendingQueue.size()+" for "+this,
+                                                                                      
 Logger.DEBUG);
                                                        
synchronized(sendingQueueBytesLock) {
                                                                sendingQueueBytes += l;
+                                                               Core.logger.log(this, 
"Added (B) message of "+
+                                                                                      
         "size "+l+" to "+
+                                                                                      
         "sendingQueue: now "+
+                                                                                      
         sendingQueueBytes+
+                                                                                      
         " bytes for "+this,
+                                                                                      
         Logger.DEBUG);
                                                        }
                                                        // Should be the last item on 
the queue...
                                                        // FIXME: LOCKING!
@@ -986,8 +1005,18 @@
                                        } else if (l + curPacketLength > 
maxPacketLength) {
                                                sendingQueue.add(0, currentSender);
                                                // FIXME: LOCKING!
+                                               Core.logger.log(this, "Added (C) 
message of size "+l+
+                                                                               " to 
sendingQueue: now "+
+                                                                               
sendingQueue.size()+" for "+this,
+                                                                               
Logger.DEBUG);
                                                synchronized(sendingQueueBytesLock) {
                                                        sendingQueueBytes += l;
+                                                       Core.logger.log(this, "Added 
(C) message of "+
+                                                                                      
 "size "+l+" to "+
+                                                                                      
 "sendingQueue: now "+
+                                                                                      
 sendingQueueBytes+
+                                                                                      
 " bytes for "+this,
+                                                                                      
 Logger.DEBUG);
                                                }
                                                break;
                                        } else {
@@ -2205,7 +2234,7 @@
                                        sendingCount++;
                                        if (raw.trailingFieldLength > 0) {
                                                trailingPresent=true;
-                                               if (logDEBUG)Core.logger.log(this, 
"enqueing trailer, queue size " + sendingQueue.size(),Logger.DEBUG);
+                                               if (logDEBUG)Core.logger.log(this, 
"enqueing trailer, queue size " + sendingQueue.size()+" on "+this,Logger.DEBUG);
                                        }
                                        if(identity != null && 
                                           ocm.findFreeConnection(identity) == null)
@@ -2267,7 +2296,10 @@
                                        //}// end of monitor
                                        if (sendingQueue.size()==0 && 
sentMessages.size() == 0) {
                                                try {
-                                                       if 
(logDEBUG)Core.logger.log(this,"executing messageSend immediately",Logger.DEBUG);
+                                                       if (logDEBUG) 
+                                                               
Core.logger.log(this,"executing messageSend "+
+                                                                                      
         "immediately ("+this+")",
+                                                                                      
         Logger.DEBUG);
                                                        currentSender = ms;
                                                        sentMessages.add(ms);
                                                        ms.start();
@@ -2280,8 +2312,18 @@
                                                }
                                        } else {
                                                sendingQueue.add(ms);
+                                               Core.logger.log(this, "Added (D) 
message of size "+
+                                                                               
b.length+" to sendingQueue: now "+
+                                                                               
sendingQueue.size()+" for "+this,
+                                                                               
Logger.DEBUG);
                                                synchronized(sendingQueueBytesLock) {
                                                        sendingQueueBytes += b.length;
+                                                       Core.logger.log(this, "Added 
(D) message of "+
+                                                                                      
 "size "+b.length+" to "+
+                                                                                      
 "sendingQueue: now "+
+                                                                                      
 sendingQueueBytes+
+                                                                                      
 " bytes for "+this,
+                                                                                      
 Logger.DEBUG);
                                                }
                                                if (logDEBUG) 
                                                        
Core.logger.log(this,"scheduling ms on queue."+
@@ -2807,7 +2849,11 @@
                totalDataReceived += amount;            
        }
 
-
+       public final boolean reallySending() {
+               return trailingPresent;
+       }
+       
+       
     /**
      [EMAIL PROTECTED]    whether the connection is currently sending something
      */
@@ -2815,11 +2861,16 @@
         //return sending.count() > 0;
                // Don't synchronize here!
         //return sendingCount > 0;
+               Core.logger.log(this, "trailingPresent="+trailingPresent+
+                                               ", 
sendingQueue.size()="+sendingQueue.size()+
+                                               " (max "+maxSendingQueueLength+"), 
sendingQueueBytes="+
+                                               sendingQueueBytes+" (max 
"+maxSendingQueueBytes+
+                                               ") for "+this+")", Logger.DEBUG);
                return trailingPresent || sendingQueue.size() > maxSendingQueueLength 
                        || sendingQueueBytes > maxSendingQueueBytes;
                // if _that_ many, we are probably stuck, maybe because of an NIO 
artifact
-       /*FIXME:  this is based on local observations on a node with 25K uplink cap.
-       
+               /*FIXME:  this is based on local observations on a node with 25K 
uplink cap.
+                 
         Setting it to 64 seems to limit the mean of sendTime to ~40-60 seconds.  
Value of
         32 has for the last 9 hours given me steady 20-40 second minute means and 
around 30 hourly mean
         for 10+msg/sec load. I'm trying 24 now, but may revert it to 32.

Index: OpenConnectionManager.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/OpenConnectionManager.java,v
retrieving revision 1.92
retrieving revision 1.93
diff -u -r1.92 -r1.93
--- OpenConnectionManager.java  3 Sep 2003 01:38:52 -0000       1.92
+++ OpenConnectionManager.java  4 Sep 2003 22:45:05 -0000       1.93
@@ -158,6 +158,9 @@
      * node identified.
      */
     public synchronized ConnectionHandler findFreeConnection(Identity id) {
+               Core.logger.log(this, "findFreeConnection("+id+")",
+                                               new Exception("debug"), Logger.DEBUG);
+               int sendingConns = 0;
         for (Enumeration e = chs.getAll(id) ; e.hasMoreElements() ; ) {
             ConnectionHandler res = (ConnectionHandler) e.nextElement();
             if (!res.isOpen()) {
@@ -167,6 +170,7 @@
                                // It will be terminated eventually
                                // Do not remove it from OCM because it will then be 
orphaned and take up a fd even though it is not available for sending.
             } else if (!res.sending()) {
+                               Core.logger.log(this, "Found "+res, Logger.DEBUG);
                 // found one
                 lru.push(res);
                 // Mark this connection as cached
@@ -175,11 +179,26 @@
                 // the Routing implementation.
                 res.setCached(true);
                 return res;
-            }
-        } 
+            } else {
+                               sendingConns++;
+                               Core.logger.log(this, "Skipping: "+res+": sending",
+                                                               Logger.DEBUG);
+                       }
+        }
+               if(sendingConns < 2) return null;
+               for (Enumeration e = chs.getAll(id) ; e.hasMoreElements() ; ) {
+                       ConnectionHandler res = (ConnectionHandler) e.nextElement();
+                       if (!res.isOpen()) {
+                               // Ignore it
+                       } else if (!res.reallySending()) {
+                               lru.push(res);
+                               res.setCached(true);
+                               return res;
+                       }
+               }
         return null;
     }
-
+       
     /**
      * This attempts to search for the open connection to id best suited for
      * sending a message. The formula for this is the connection with the 
@@ -261,6 +280,7 @@
                ConnectionJob ct = null;
                boolean updatedRefcount = false;
                
+               boolean weStarted = false;
                try {
                        synchronized(connectionJobs) {
                                while(ct == null || ct.done) {
@@ -275,12 +295,17 @@
                                        break;
                                }
                                if(ct != null) {
+                                       Core.logger.log(this, "Got "+ct+", waiting on 
it",
+                                                                       Logger.DEBUG);
                                        updatedRefcount = true;
                                        ct.incRefcount();
                                } else {
+                                       weStarted = true;
                                        ct = new ConnectionJob(c, p);
                                        connectionJobs.put(p, ct);
                                        updatedRefcount = true;
+                                       Core.logger.log(this, "Created new ConnJob: 
"+ct,
+                                                                       Logger.DEBUG);
                                        ct.incRefcount();
                                }
                        }
@@ -289,7 +314,8 @@
                                if(timeout == -1) {
                                        ct.run();
                                } else {
-                                       tf.getThread(ct);
+                                       if(weStarted)
+                                               tf.getThread(ct);
                                        //tm.forceRun(ct);
                                        // Restored pooled threads (oskar 20020204)
                                        //Thread job = new Thread(ct, 
@@ -367,8 +393,10 @@
         // but nothing guarantees that some other thread won't have sent a
         // message with a huge trailing field by the time you actually
         // try to send a message on it.
+               Core.logger.log(this, "getConnection("+p+","+timeout+")",
+                                               Logger.DEBUG);
         ConnectionHandler ch = findFreeConnection(p.getIdentity()); 
-
+               
         if (ch == null) {
             ch = createConnection(c, p, timeout);
         }

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.361
retrieving revision 1.362
diff -u -r1.361 -r1.362
--- Version.java        3 Sep 2003 18:10:12 -0000       1.361
+++ Version.java        4 Sep 2003 22:45:05 -0000       1.362
@@ -18,7 +18,7 @@
     public static String protocolVersion = "1.46";
     
     /** The build number of the current revision */
-    public static final int buildNumber = 6168;
+    public static final int buildNumber = 6169;
     // 6028: may 3; ARK retrieval fix
 
     public static final int ignoreBuildsAfter = 6500;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to