Update of /cvsroot/freenet/freenet/src/freenet/transport
In directory sc8-pr-cvs1:/tmp/cvs-serv25857/src/freenet/transport

Modified Files:
      Tag: stable
        AbstractSelectorLoop.java ListenSelectorLoop.java 
        NIOReader.java ReadSelectorLoop.java 
        ThrottledSelectorLoop.java WriteSelectorLoop.java 
        tcpAddress.java tcpConnection.java tcpTransport.java 
Log Message:
5029: Merge from unstable after months of work. MASSIVE changes.
Highlights:
* Next Generation Routing, massive related changes
* Major changes to handling of messages and connections (PeerHandler and related 
changes)
* Even more non-blocking I/O
* Documentation improvements
* Lots of new diagnostics and config options
* Lots of bug fixes and performance tweaking
* Probably lots of new bugs too!


Index: AbstractSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/AbstractSelectorLoop.java,v
retrieving revision 1.15.2.17
retrieving revision 1.15.2.18
diff -u -w -r1.15.2.17 -r1.15.2.18
--- AbstractSelectorLoop.java   23 Aug 2003 12:04:43 -0000      1.15.2.17
+++ AbstractSelectorLoop.java   28 Oct 2003 20:20:48 -0000      1.15.2.18
@@ -30,6 +30,8 @@
        
        private static LinkedList closeQueue = null;
        private static LinkedList preCloseQueue = null;
+       private static Object preCloseQueueObject = new Object(); //necessary cause we 
change preCloseQueue
+       
        private static boolean isWindows = 
(System.getProperty("os.name").toLowerCase().indexOf("windows") != -1);
        
        private static CloseThread closeThread= null;
@@ -160,12 +162,14 @@
                public Connection conn;
                public Object attachment;
                public SocketChannel sc;
+               public long timeStamp; //debugging.  Can be removed eventually
 
                public ClosePair(Connection conn, Object attachment, 
                                                 SocketChannel sc) {
                        this.conn = conn;
                        this.attachment = attachment;
                        this.sc = sc;
+                       timeStamp = System.currentTimeMillis();
                }
                
                public String toString() {
@@ -224,6 +228,9 @@
        }
        
        protected abstract int myKeyOps();
+       public SelectionKey keyFor(SelectableChannel ch) {
+               return ch.keyFor(sel);
+       }
        
        /**
         * register/unregister waiters in the queues
@@ -276,6 +283,10 @@
                                                } catch (ClassCastException e) {
                                                        // Not an NIOCallback
                                                        continue;
+                                               } catch (CancelledKeyException e) {
+                                                       //do nothing?
+                                                       if (logDebug)
+                                                               
Core.logger.log(this,"cought "+e,Logger.DEBUG);
                                                }
                                        } else {
                                                notRegistered.add(current);
@@ -291,20 +302,39 @@
                //then remove
                synchronized(unregisterWaiters) {
                while (unregisterWaiters.size() >0) {
-                       ChannelAttachmentPair current = 
(ChannelAttachmentPair)unregisterWaiters.removeFirst();
-                       if (current.channel!=null) //we have a channel
-                               current.channel.keyFor(sel).cancel();
+                       ChannelAttachmentPair current;
+                       try {
+                               current = 
+                                       
(ChannelAttachmentPair)unregisterWaiters.removeFirst();
+                       } catch (NoSuchElementException e) {
+                               Core.logger.log(this, "Parallel removal of elements in 
"+this+
+                                                               "?: "+e, e, 
Logger.ERROR);
+                               break;
+                       }
+                       if (current.channel!=null) {
+                               //we have a channel
+                               SelectionKey k = current.channel.keyFor(sel);
+                               if(k != null) k.cancel();
+                       }
                        // Not used by WSL, so we don't need to tell it
                        else if (current.attachment!=null) { //we have only the 
attachment
                                Iterator i = sel.keys().iterator();
                                while(i.hasNext()) {
                                        SelectionKey curKey = (SelectionKey)i.next();
-                                       if 
(curKey.attachment().equals(current.attachment)){
+                                       if(curKey == null) continue;
+                                       if(!curKey.isValid()) continue;
+                                       Object attachment = curKey.attachment();
+                                       if(attachment == null) {
+                                               if(logDebug)
+                                                       Core.logger.log(this, "Key 
"+curKey+" has null "+
+                                                                                      
 "attachment unregistering "+
+                                                                                      
 current, Logger.ERROR);
+                                               curKey.cancel();
+                                       } else if 
(attachment.equals(current.attachment)) {
                                                curKey.cancel();
                                                break;
                                        }
                                }
-
                        }
                        if (current.attachment!=null)
                                try{
@@ -347,7 +377,15 @@
                }
                
                try {
+                       if(sel.isOpen()) {
+                               try {
                        sel.close();
+                               } catch (Throwable t) {
+                                       Core.logger.log(this, "Caught "+t+" closing 
selector in reset()",
+                                                                       t, 
Logger.ERROR);
+                               }
+                               // Try open() anyway
+                       }
                        
                        sel = Selector.open();
                        
@@ -397,26 +435,63 @@
         * channels are closed
         * now this cancel()'s the keys and doesn't notify the close thread --zab
         */
-       private void handleCloseQueuePipe()
-       {
-               synchronized(closeQueue) {
-                       synchronized(preCloseQueue) {
-                               //boolean wasEmpty = closeQueue.isEmpty();
-                               //boolean itemAdded = false;
+       private void handleCloseQueuePipe() {
+               //We aren't synchronized on preCloseQueue here but it doesn't mean the 
end of the
+               //world that we queue the close a little while later instead (as of
+               //2003-10-16 this method is called by three selector threads).
+               //This extra check is done to avoid the 
'synchronized(preCloseQueueObject)'
+               //scope below if it probably wouldn't gain us something to enter it
+               //(locking optimization)
+               if(preCloseQueue.isEmpty())
+                       return;
+
+               //Remove all entries from the preCloseQueue into a local queue
+               //in order to prevent other threads from beeing blocked on 
preCloseQueue
+               //during the processing below (locking optimization)
+               LinkedList workList = new LinkedList();
+               synchronized (preCloseQueueObject) {
                                while(!preCloseQueue.isEmpty()) {
-                                       ClosePair current = 
(ClosePair)preCloseQueue.removeFirst();
-                                       if (current.sc != null && 
current.sc.keyFor(sel) != null)
-                                               current.sc.keyFor(sel).cancel();
-                                       closeQueue.addLast(current);
-                               //      itemAdded = true;
+                               workList.addLast(preCloseQueue.removeFirst());
+                       }
                                }
                                        
-                               /*if(wasEmpty && itemAdded)
-                                       closeQueue.notify();*/
+               //Cancel the keys that have been queued for closing on all ?known?
+               //selector threads
+               Iterator it = workList.iterator();
+               ClosePair current;
+               while(it.hasNext()){
+                       current = (ClosePair)it.next();
+                       if ( current.sc != null) {
+                               SelectionKey k = 
tcpConnection.getRSL().keyFor(current.sc);
+                               if (k != null) {
+                                       k.cancel();
+                               } else {
+                                       k = tcpConnection.getWSL().keyFor(current.sc);
+                                       if (k != null) {
+                                               k.cancel();
+                                       }
+                               }
                        }
                }       
+               
+               //An extra check here to avoid entering the 'synchronized(closeQueue)' 
scope below
+               //if we dont actually need to (locking optimization)
+               if(workList.isEmpty())
+                       return;
+
+               //Finished processing all the closerequests we popped from the
+               //preCloseQueue earlier in this method, now queue them for actual 
closing
+               //From here the closeThread will pick them up at its leisure...
+               synchronized (closeQueue) {
+                               closeQueue.addAll(workList);
+               }
        }
        
+       /**
+        * Do a selection operation
+        * @return true if we selected, false if something failed, the usual
+        * response would be to try again
+        */
        protected final boolean mySelect(int x) throws IOException {
                boolean windowsBugHappened = false;
                try{
@@ -519,6 +594,20 @@
                                }
                                return false;
                        } else throw e;
+               } catch (ClosedSelectorException e) {
+                       Core.logger.log(this, "WTF?!: "+e, e,
+                                                       Logger.ERROR);
+                       try {
+                               reset();
+                       } catch (Throwable t) {
+                               Core.logger.log(this, "Reopening selector FAILED!: 
"+t, t,
+                                                               Logger.ERROR);
+                               System.err.println("Reopening selector FAILED! Tried 
to handle: "+e);
+                               e.printStackTrace(System.err);
+                               System.err.println("But then caught: "+t);
+                               t.printStackTrace(System.err);
+                       }
+                       return false;
                }
                if(!windowsBugHappened)
                        this.consecutiveWindowsBugEncounters =0;
@@ -539,11 +628,11 @@
                int consecutiveDuds = 0;
                while(!stop) {
                        fastReturn = false;
-                       logDebug = Core.logger.shouldLog(Logger.DEBUG);
+                       logDebug = Core.logger.shouldLog(Logger.DEBUG, this);
                        try{ //a very wide and generic net
                        
                        //cancel() all keys before beforeSelect()
-                       handleCloseQueuePipe();
+                       handleCloseQueuePipe();  // move this down to rsl --zab
                        
                        beforeSelect();
                        
@@ -558,8 +647,12 @@
                        //sel.selectedKeys().clear();
                        //select on the selector
                        iter++;
+                       long now = System.currentTimeMillis();
                        if(!mySelect(timeout)) continue;
-                       
+                       long selected = System.currentTimeMillis();
+                       if(logDebug) Core.logger.log(this, "Returned from selector in 
"+
+                                                       (selected-now)+" millis",
+                                                       Logger.DEBUG);
                        
                                                
 //                     Core.logger.log(this, "Returned from selector, 
"+currentlyActive+
@@ -570,16 +663,13 @@
                        //currentSet.addAll(sel.selectedKeys());
                        
                        if(logDebug)
-                               Core.logger.log(this, "Keys ready before fixKeys: "+
-                                                               currentlyActive+"/"+
-                                                               sel.keys().size(), 
Logger.DEBUG);
+                               Core.logger.log(this, "Keys ready before fixKeys: " + 
currentlyActive + "/" + sel.keys().size(), Logger.DEBUG);
                        
                        fixKeys();
                        //if (currentlyActive != currentSet.size()) 
Core.logger.log(this, "read the freaking book! "+ currentlyActive +" != "+ 
currentSet.size() ,Logger.ERROR);
                        currentlyActive = currentSet.size();
                        if(logDebug)
-                               Core.logger.log(this, "Keys ready: 
"+currentlyActive+"/"+
-                                                               sel.keys().size(), 
Logger.DEBUG);
+                               Core.logger.log(this, "Keys ready: " + currentlyActive 
+ "/" + sel.keys().size(), Logger.DEBUG);
                        
                        //if at this point no channels are active, it means
                        //we were just woken up or timed out.
@@ -624,14 +714,11 @@
                        System.runFinalization();
                        freenet.node.Main.dumpInterestingObjects();
                        try {
-                               Core.logger.log(this, "Ran emergency GC in 
"+getClass().getName(),
-                                                               Logger.ERROR);
+                               Core.logger.log(this, "Ran emergency GC in " + 
getClass().getName(), Logger.ERROR);
                        } catch (Throwable any) {};
                } catch(Throwable t){
                        try {
-                               Core.logger.log(this, 
-                                                               "Caught throwable in 
AbstractSelectorLoop!: "
-                                                               +t, t, Logger.ERROR);
+                               Core.logger.log(this, "Caught throwable in 
AbstractSelectorLoop!: " + t, t, Logger.ERROR);
                                t.printStackTrace();
                        } catch (Throwable x) {};
                }
@@ -652,7 +739,7 @@
                if(chan.attachment != null)
                        ((NIOCallback)(chan.attachment)).queuedClose();
                //if(isWindows)
-                       synchronized(preCloseQueue) {
+                       synchronized(preCloseQueueObject) {
                                preCloseQueue.addLast(chan);
                        }
        /*      else
@@ -664,7 +751,7 @@
                        }*/
        }
 
-       public void queueClose(Connection conn, SocketChannel sc) {
+       public final void queueClose(Connection conn, SocketChannel sc) {
                Socket s = null;
                try {
                        s = ((tcpConnection)conn).getSocket();
@@ -673,7 +760,7 @@
                        queueClose(new ClosePair(conn, null, sc));
        }
        
-       public void queueClose(Connection conn, NIOCallback cb, SocketChannel sc) {
+       public final void queueClose(Connection conn, NIOCallback cb, SocketChannel 
sc) {
                Socket s = null;
                try {
                        s = ((tcpConnection)conn).getSocket();
@@ -682,33 +769,26 @@
                        queueClose(new ClosePair(conn, cb, sc));
        }
        
-       public void queueClose(SocketChannel chan) {
-               if(!chan.isConnected()) return;
-               if(!chan.isOpen()) return;
-               Connection c = tcpConnection.getConnectionForSocket(chan.socket());
-               if(c == null) {
-                       if(!chan.isConnected()) return;
-                       if(!chan.isOpen()) return;
-                       throw new IllegalArgumentException("Fed socket not connected 
to a tcpConnection!: "+chan);
-               }
-               if (closeUniqueness.containsKey(c)) return;
-               queueClose(new ClosePair(c, null, chan));
+       public final void queueClose(SocketChannel chan) {
+               queueClose(chan, null);
        }
        
-       public void queueClose(SocketChannel chan, NIOCallback nc) {
-               if(!chan.isConnected()) return;
-               if(!chan.isOpen()) return;
-               Connection c = tcpConnection.getConnectionForSocket(chan.socket());
+       public final void queueClose(SocketChannel chan, NIOCallback nc) {
+               if(chan == null) return;
+               Socket sock = chan.socket();
+               if(sock == null) return;
+               Connection c = tcpConnection.getConnectionForSocket(sock);
                if(c == null) {
-                       if(!chan.isConnected()) return;
-                       if(!chan.isOpen()) return;
-                       throw new IllegalArgumentException("Fed socket not connected 
to a tcpConnection!: "+chan+","+nc);
+                       if(chan.isConnected() && chan.isOpen())
+                               throw new IllegalArgumentException("Fed socket not 
connected "+
+                                                                                      
            "to a tcpConnection!: "+chan);
+                       else return;
                }
                if (closeUniqueness.containsKey(c)) return;
                queueClose(new ClosePair(c,nc,chan));
        }
        
-       public void queueClose(ChannelAttachmentPair pair) {
+       public final void queueClose(ChannelAttachmentPair pair) {
                NIOCallback cb = null;
                if(pair.attachment instanceof NIOCallback)
                        cb = (NIOCallback)(pair.attachment);
@@ -729,11 +809,12 @@
                                        synchronized(closeQueue) {
                                                while(closeQueue.isEmpty()) {
                                                        try {
-                                                               closeQueue.wait(500);
+                                                               //closeQueue.wait(500);
+                                                               closeQueue.wait(); 
//Wait until someone tells us to close.. dont start closing when we aren't told to
                                                        } catch (InterruptedException 
e) {};
                                                        if(terminateCloseThread) 
return;
                                                }
-                                               current = 
(ClosePair)(closeQueue.removeFirst());
+                                               current = 
(ClosePair)closeQueue.removeFirst();
                                        }
                                        if(current != null) {
                                                Connection c = current.conn;
@@ -741,8 +822,9 @@
                                                        try {
                                                                c.close(true);
                                                        } catch (Throwable t) {
-                                                               Core.logger.log(this, 
"Caught "+t+" closing "+
-                                                                                      
         c, t, Logger.ERROR);
+                                                               Core.logger.log(this, 
"Caught " + t + " closing " + c, t, Logger.ERROR);
+                                                       } finally {
+                                                               
Core.diagnostics.occurrenceContinuous("closePairLifetime", System.currentTimeMillis() 
- current.timeStamp);
                                                        }
                                                        //notify the callback
                                                        if (current.attachment !=null){
@@ -767,8 +849,7 @@
                                        } catch (Throwable any) {};
                                } catch (Throwable t) {
                                        try {
-                                               String s = "Caught Throwable "+t+
-                                                       " in background close thread!";
+                                               String s = "Caught Throwable " + t + " 
in background close thread!";
                                                Core.logger.log(this, s, t, 
Logger.ERROR);
                                                // FIXME
                                                // System.err, System.out may be files 
on disk

Index: ListenSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/ListenSelectorLoop.java,v
retrieving revision 1.4.2.7
retrieving revision 1.4.2.8
diff -u -w -r1.4.2.7 -r1.4.2.8
--- ListenSelectorLoop.java     16 Aug 2003 17:30:07 -0000      1.4.2.7
+++ ListenSelectorLoop.java     28 Oct 2003 20:20:48 -0000      1.4.2.8
@@ -91,11 +91,11 @@
         boolean success = true;
         oomSleep = 1000;
         try {
-            Iterator i = currentSet.iterator();
+            Iterator i = sel.keys().iterator();
             while (i.hasNext()) {
                 try {
                    SelectionKey curKey = (SelectionKey)i.next();
-                   i.remove(); //yes I think its a good idea
+                                       //i.remove(); //yes I think its a good idea // 
reenable iff switch back to selectedKeys
                     ServerSocketChannel sc = (ServerSocketChannel)curKey.channel();
                    SocketChannel chan = null;
                     NIOListener listener = (NIOListener)curKey.attachment();
@@ -116,7 +116,8 @@
                     System.runFinalization();
                     oomSleep *= 2;
                     try {
-                        String s = "Attempted to recover from OutOfMemoryError";
+                        String s = "Attempted to recover from OutOfMemoryError "+
+                                                       e.toString();
                         freenet.Core.logger.log(this, s, 
                                                 freenet.support.Logger.ERROR);
                         System.err.println(s);

Index: NIOReader.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/NIOReader.java,v
retrieving revision 1.2.2.1
retrieving revision 1.2.2.2
diff -u -w -r1.2.2.1 -r1.2.2.2
--- NIOReader.java      1 Jul 2003 02:27:18 -0000       1.2.2.1
+++ NIOReader.java      28 Oct 2003 20:20:48 -0000      1.2.2.2
@@ -1,7 +1,6 @@
 /* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
 package freenet.transport;
 
-
 import java.nio.*;
 public interface NIOReader extends NIOCallback {
 /**

Index: ReadSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/ReadSelectorLoop.java,v
retrieving revision 1.17.2.8
retrieving revision 1.17.2.9
diff -u -w -r1.17.2.8 -r1.17.2.9
--- ReadSelectorLoop.java       2 Aug 2003 02:36:19 -0000       1.17.2.8
+++ ReadSelectorLoop.java       28 Oct 2003 20:20:48 -0000      1.17.2.9
@@ -30,6 +30,15 @@
        
        private final LinkedList maintenanceQueue;
 
+    private class MaintenancePair {
+        NIOReader attachment;
+        SocketChannel chan;
+        MaintenancePair(NIOReader reader, SocketChannel sc) {
+            this.attachment=reader;
+            this.chan=sc;
+        }
+    }
+    
        //this needs to be taken from the settings.. wide guess is 2K
        //public static final int BUFFER_SIZE=2048;
        
@@ -42,9 +51,10 @@
          will be used.  We can't however ignore those users on 100MBit connections 
;-)) */
        protected static final int MAX_CONC_CHANNELS=20;
        
-       public ReadSelectorLoop(Bandwidth bw) throws IOException{
+    public ReadSelectorLoop(Bandwidth bw, int timerGranularity) 
+        throws IOException{
                
-               super(bw);
+        super(bw, timerGranularity);
                //create the buffer stuff
                bufferMap = new HashMap(MAX_CONC_CHANNELS);
        //      buffers = new ByteBuffer[MAX_CONC_CHANNELS];
@@ -104,25 +114,89 @@
                }
                //make sure no message got stuck behind a trailing field
                
+        if(logDebug)
+            Core.logger.log(this, "beforeSelect(), mq.size="+
+                            maintenanceQueue.size(), Logger.DEBUG);
+        
                while (maintenanceQueue.size() > 0) {
-                       NIOReader current;
-                       //process could take a while, so lock just this
+            MaintenancePair mp;
                        synchronized(maintenanceQueue) {
-                               current = (NIOReader)maintenanceQueue.removeFirst();
+                mp = (MaintenancePair)maintenanceQueue.removeFirst();
+            }
+            SocketChannel chan = mp.chan;
+            NIOReader current = mp.attachment;
+            SelectionKey k = chan.keyFor(sel);
+            //process could take a while, so lock just this
+            try {
+                int status = current.process(null);
+                if(logDebug)
+                    Core.logger.log(this, "Running maintenance on "+chan+
+                                    ":"+current+", returned "+status,
+                                    Logger.DEBUG);
+                if(status == -1) {
+                    if(logDebug)
+                        Core.logger.log(this, "Closing connection "+chan+":"+
+                                        current+" (process returned -1)", 
+                                        Logger.DEBUG);
+                    if(k != null) {
+                        k.attach(null);
+                        k.cancel();
+                        current.unregistered();
+                    } else {
+                        if(logDebug)
+                            Core.logger.log(this, "Maintenance process returned -1 
but not registered on selector, queuing for unregistration", Logger.DEBUG);
+                        unregisterWaiters.add(new ChannelAttachmentPair
+                                              (chan, current));
+                    }
+                    queueClose((SocketChannel)chan,current);
+                } else if (status == 0) {
+                    if (logDebug) Core.logger.log(this, "Cancelling "+chan+
+                                                  ":"+current+"(returned 0)",
+                                                  Logger.DEBUG);
+                    if(k != null) {
+                        k.cancel();
+                    } else {
+                        if(logDebug)
+                            Core.logger.log(this, "Maintenance process returned 0 but 
not registered on selector, queuing for unregistration", Logger.DEBUG);
+                        unregisterWaiters.add(new ChannelAttachmentPair
+                                              (chan, current));
+                    }
+                    synchronized(dontReregister) {
+                        dontReregister.add(chan);
+                    }
+                    if (logDebug)Core.logger.log(this, "Cancelled "+chan+": "+
+                                                 (k==null?"(null)":
+                                                  Boolean.toString(k.isValid()))
+                                                 +":"+k, Logger.DEBUG);
+                }
+            } catch (OutOfMemoryError e) {
+                System.gc();
+                System.runFinalization();
+                System.gc();
+                System.runFinalization();
+                freenet.node.Main.dumpInterestingObjects();
+                try {
+                    Core.logger.log(this, "Ran emergency GC in "+
+                                    getClass().getName(), Logger.ERROR);
+                } catch (Throwable any) {};
+            } catch (Throwable t) {
+                System.err.println("Caught "+t+" running maintenance queue");
+                t.printStackTrace(System.err);
+                Core.logger.log(this, "Caught "+t+" running maintenance queue",
+                                t, Logger.ERROR);
                        }
-                       current.process(null);
                }
-               
                throttleBeforeSelect();
-               
        }
        
-       public final void scheduleMaintenance(NIOReader cb) {
+    public final void scheduleMaintenance(SocketChannel cb,
+                                          NIOReader attachment) {
                if(logDebug)
-                       Core.logger.log(this, "Scheduling maintenance on "+cb, 
-                                                       new Exception("debug"), 
Logger.DEBUG);
+            Core.logger.log(this, "Scheduling maintenance on "+cb+
+                            ":"+attachment, new Exception("debug"), 
+                            Logger.DEBUG);
                synchronized (maintenanceQueue) {
-                       maintenanceQueue.addLast(cb);
+            maintenanceQueue.addLast(new MaintenancePair(attachment, cb));
                }
        }
        
@@ -269,9 +343,14 @@
                                        }
                                }
                        } catch (Throwable e) {
-                               e.printStackTrace();
+                try {
                                Core.logger.log(this, "Unexpected throwable reading 
data for "+
                                                                nc+": "+e, e, 
Logger.NORMAL);
+                    e.printStackTrace();
+                } catch (Throwable t) {
+                    Core.logger.log(this, "Unexpected throwable reading data: "+
+                                    e, e, Logger.NORMAL);
+                }
                                size = -1; // try unregistering it...
                        }
                        if (size == -1) {
@@ -291,7 +370,7 @@
                                nc.unregistered();
                                noneWorking = false;
                        } else if (size > 0 || (bumper.limit() > 0)) {
-                               //do not call process() if nothign was read
+                //do not call process() if nothing was read
                                //this actually is an error in fixkeys
                                if(size > 0) {
                                        bytesRead += (size+OVERHEAD);
@@ -373,6 +452,9 @@
                                                                        nc, t, 
Logger.NORMAL);
                                        status = -1;
                                }
+                if(logDebug)
+                    Core.logger.log(this, ""+chan+":"+nc+" returned "+
+                                    status, Logger.DEBUG);
                                if(status == -1) {
                                        if(logDebug)
                                                Core.logger.log(this, "Closing 
connection "+chan+":"+nc+

Index: ThrottledSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/ThrottledSelectorLoop.java,v
retrieving revision 1.10.2.7
retrieving revision 1.10.2.8
diff -u -w -r1.10.2.7 -r1.10.2.8
--- ThrottledSelectorLoop.java  5 Aug 2003 19:41:29 -0000       1.10.2.7
+++ ThrottledSelectorLoop.java  28 Oct 2003 20:20:48 -0000      1.10.2.8
@@ -32,6 +32,8 @@
        
        int throttleQueueLength = 0;
        
+       protected boolean shortTimeout = false;
+       
        public final int throttleQueueLength() {
                return throttleQueueLength;
        }
@@ -56,11 +58,14 @@
     // sync on this to prevent SelectorLoop thread from un-throttling
     
     protected Bandwidth bw;
+       protected int timerGranularity;
     
-    public ThrottledSelectorLoop(Bandwidth bw) throws IOException {
+    public ThrottledSelectorLoop(Bandwidth bw, int timerGranularity) 
+               throws IOException {
        
        this.bw = bw;
        throttleDisabledQueue = new LinkedList();
+               this.timerGranularity = timerGranularity;
 //        rand = freenet.Core.randSource;
     }
     
@@ -76,16 +81,16 @@
        }
        
     public final void throttleBeforeSelect() {
-               boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+               logDebug = Core.logger.shouldLog(Logger.DEBUG, this);
                if (throttling) {
                        long now = System.currentTimeMillis();
-                       if(logDEBUG)
+                       if(logDebug)
                                Core.logger.log(this, "Still throttling at "+now,
                                                                Logger.DEBUG);
                        if(now >= reregisterThrottledTime) {
                                synchronized(throttleLock) {
                                        if(bytesRemainingOnThrottle > 0) {
-                                               if(logDEBUG)
+                                               if(logDebug)
                                                        Core.logger.log(this, "Calling 
throttle with remaining "+
                                                                                       
 bytesRemainingOnThrottle+" bytes", 
                                                                                       
 Logger.DEBUG);
@@ -94,7 +99,7 @@
                                                if(tok.sleepUntil > 0) {
                                                        reregisterThrottledTime = 
tok.sleepUntil;
                                                        bytesRemainingOnThrottle -= 
tok.availableNow;
-                                                       if(logDEBUG)
+                                                       if(logDebug)
                                                                Core.logger.log(this, 
"Still not ready - sleeping for "+
                                                                                       
         (reregisterThrottledTime - now)+" ms",
                                                                                       
         Logger.DEBUG);
@@ -110,10 +115,11 @@
                                   reregisterThrottledTime > now) {
                                        timeout = (int)(reregisterThrottledTime - now);
                                }
-                       }
+                       } else timeout = shortTimeout ? 0 : TIMEOUT;
                } else
-                       timeout = TIMEOUT;
-               if(logDEBUG)
+                       timeout = shortTimeout ? 0 : TIMEOUT;
+               shortTimeout = false;
+               if(logDebug)
                        Core.logger.log(this, "Set timeout to "+timeout,
                                                        Logger.DEBUG);
        }
@@ -158,7 +164,7 @@
                                        // Already thinks it is registered
                                }
                        } catch (CancelledKeyException e) {
-                               if(Core.logger.shouldLog(Logger.DEBUG))
+                               if(Core.logger.shouldLog(Logger.DEBUG,this))
                                        Core.logger.log(this, "Key ("+current+
                                                                        ") cancelled 
but not removed: "+e, e,
                                                                        Logger.ERROR);
@@ -168,7 +174,7 @@
                                queueClose(current);
                                closed++;
                        } catch (ClosedChannelException e) {
-                               if(Core.logger.shouldLog(Logger.DEBUG))
+                               if(Core.logger.shouldLog(Logger.DEBUG,this))
                                        Core.logger.log(this, "Key ("+current+
                                                                        ") channel 
closed but not removed: "+e,
                                                                        e, 
Logger.ERROR);
@@ -200,9 +206,9 @@
        
     protected final void throttleConnections(int bytesRead, int throttledBytesRead, 
int pseudoThrottledBytesRead) {
                if(bw == null) return;
-               boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+               logDebug = Core.logger.shouldLog(Logger.DEBUG, this);
                if(bytesRead > 0) {
-                       if(logDEBUG)
+                       if(logDebug)
                                Core.logger.log(this, "Bytes moved total this loop: "+
                                                                bytesRead+", bytes 
that need throttling: "+
                                                                throttledBytesRead+", 
pseudo-throttled: "+
@@ -252,7 +258,7 @@
                                        SelectionKey curKey = 
                                                (SelectionKey)(it.next());
                                        if(!curKey.isValid()) {
-                                               if(logDEBUG) Core.logger.log(this, 
"Invalid "+curKey+"("+curKey.channel()+","+curKey.attachment()+" on selector in 
throttleConnections, ignoring", Logger.DEBUG);
+                                               if(logDebug) Core.logger.log(this, 
"Invalid "+curKey+"("+curKey.channel()+","+curKey.attachment()+" on selector in 
throttleConnections, ignoring", Logger.DEBUG);
                                                onInvalidKey(curKey);
                                                continue;
                                        }
@@ -279,11 +285,11 @@
                                                        throttleQueueLength = 
throttleDisabledQueue.size();
                                                        deregistered++;
                                                }
-                                               if(logDEBUG) Core.logger.log(this, 
"Deregistered "+
+                                               if(logDebug) Core.logger.log(this, 
"Deregistered "+
                                                                                
curKey.attachment(), Logger.DEBUG);
                                        }
                                }
-                               if(logDEBUG)
+                               if(logDebug)
                                        Core.logger.log(this, "Deregistered 
"+deregistered+
                                                                        " keys, 
TDQ.size="+
                                                                        
throttleDisabledQueue.size(),
@@ -329,6 +335,13 @@
                        dontReregister.remove(ch);
                }
                super.register(ch, attachment);
+       }
+       
+       protected void queueClose(ClosePair chan) {
+               synchronized(dontReregister) {
+                       dontReregister.remove(chan.sc);
+               }
+               super.queueClose(chan);
        }
        
     /**

Index: WriteSelectorLoop.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/WriteSelectorLoop.java,v
retrieving revision 1.16.2.12
retrieving revision 1.16.2.13
diff -u -w -r1.16.2.12 -r1.16.2.13
--- WriteSelectorLoop.java      15 Aug 2003 03:05:56 -0000      1.16.2.12
+++ WriteSelectorLoop.java      28 Oct 2003 20:20:48 -0000      1.16.2.13
@@ -51,12 +51,24 @@
        private static final int TABLE_SIZE=512;
        private static final float TABLE_FACTOR=(float)0.6;
 
+       int minSendBytesPerThrottleCycle;
+       private static final int GRANULES_PER_THROTTLE_CYCLE = 20;
+       
        /**
         * nothing special about this constructor
         */
-       public WriteSelectorLoop(Bandwidth bw) throws IOException {
+       public WriteSelectorLoop(Bandwidth bw, int timerGranularity) 
+               throws IOException {
                
-               super(bw);
+               super(bw, timerGranularity);
+               minSendBytesPerThrottleCycle = 
+                       (bw.currentBandwidthPerSecondAllowed() * 
+                        GRANULES_PER_THROTTLE_CYCLE *
+                        timerGranularity) / 1000;
+               Core.logger.log(this, "Minimum send size: "+
+                                               minSendBytesPerThrottleCycle+" per "+
+                                               (GRANULES_PER_THROTTLE_CYCLE * 
timerGranularity)+
+                                               "ms", Logger.MINOR);
                jobs = new BlockingQueue();
                uniqueness = new Hashtable(TABLE_SIZE,TABLE_FACTOR);
                sorter=new QuickSorter();
@@ -68,6 +80,7 @@
        public WriteSelectorLoop() throws IOException {
                
                super();
+               minSendBytesPerThrottleCycle = Integer.MAX_VALUE;
                jobs = new BlockingQueue();
                uniqueness = new Hashtable(TABLE_SIZE,TABLE_FACTOR);
                sorter=new QuickSorter();
@@ -76,7 +89,7 @@
        protected Object idSync = new Object();
        protected long idCount = 0;
        public final static int maxDelay = 2000;
-       public static final int NEGOTIATION = -1;
+       public static final int NEGOTIATION = -10;
        public static final int MESSAGE = 0;
        public static final int TRAILER = 1;
        /**
@@ -174,7 +187,7 @@
                
                public final String toString() {
                        return SendJob.this.getClass().getName()+": 
"+data+","+destination+
-                               ","+client+","+position+","+id;
+                               ","+client+","+position+","+id+",prio="+priority;
                }
                
                public final void closed() {
@@ -235,7 +248,8 @@
         
                if (data.length==0 || destination ==null) throw new IOException ("no 
data to send?");
                
-               if (!checkValid(destination))return false;
+               if (!checkValid(destination, client))
+                       return false;
                
                SendJob job = new SendJob(data,destination,client, priority);
                
@@ -255,7 +269,8 @@
         
                if (len==0 || destination==null) throw new IOException ("no data to 
send?");
                
-               if (!checkValid(destination))return false;
+               if (!checkValid(destination, client))
+                       return false;
                
                SendJob job = new SendJob(data,offset,len,destination,client,priority);
                
@@ -270,13 +285,14 @@
                return true;
         }
         
-        private final boolean checkValid(SelectableChannel destination) {
+        private final boolean checkValid(SelectableChannel destination,
+                                                                         NIOWriter 
client) {
                //if we are already registered, return false
                SelectionKey key = destination.keyFor(sel);
                if (key != null && key.isValid()) {
-                       if(logDebug)
-                               Core.logger.log(this, "Send failed due to valid key",
-                                                               Logger.DEBUG);
+                       Core.logger.log(this, "Send failed due to valid key for "+
+                                                       destination+":"+client, 
+                                                       new Exception("debug"), 
Logger.ERROR);
                        return false;
                }
                
@@ -325,8 +341,7 @@
                 
                 //this queue is local to the thread.
                 LinkedList waitingJobs = new LinkedList();
-                boolean logDEBUG = logDebug;
-                if(logDEBUG)
+                if(logDebug)
                         Core.logger.log(this, "beforeSelect()", Logger.DEBUG);
                 // first check if any channel is registered in the 
                 // selector.
@@ -348,13 +363,13 @@
                                 if (sel.keys().isEmpty() && jobs.isEmpty()) {
                                         //check if the jobs queue is empty,
                                         //and if it is, block on it.
-                                        if(logDEBUG)
+                                        if(logDebug)
                                                 Core.logger.log(this, "Waiting for 
job to add to "+
                                                                                 
"queue", Logger.DEBUG);
                                         long delay = 0;
                                         synchronized(throttleLock) {
                                                 if(throttling) {
-                                                        if(logDEBUG)
+                                                        if(logDebug)
                                                                 Core.logger.log(this, 
"Throttling at "+now+
                                                                                       
          " until "+
                                                                                       
          reregisterThrottledTime, 
@@ -369,7 +384,7 @@
                                         }
                                         int x = (delay > Integer.MAX_VALUE) ? 
Integer.MAX_VALUE
                                                 : ((int)delay);
-                                        if(logDEBUG) 
+                                        if(logDebug) 
                                                 Core.logger.log(this, "Delay will be 
"+x+" ms: ",
                                                                                 
Logger.DEBUG);
                                         Object o = jobs.dequeue(x);
@@ -381,7 +396,7 @@
                                                                 ChannelAttachmentPair 
pair = 
                                                                         new 
ChannelAttachmentPair(current.destination, current);
                                                                 
throttleDisabledQueue.addLast(pair);
-                                                                if(logDEBUG)
+                                                                if(logDebug)
                                                                         
Core.logger.log(this, "Moving new job "+
                                                                                       
                  pair+" onto delay queue", 
                                                                                       
                  Logger.DEBUG);
@@ -389,7 +404,7 @@
                                                         }
                                                 }
                                                 waitingJobs.add(o);
-                                                if(logDEBUG)
+                                                if(logDebug)
                                                         Core.logger.log(this, 
"Dequeued job",
                                                                                       
  Logger.DEBUG);
                                                 continue;
@@ -408,20 +423,20 @@
                                                                 ChannelAttachmentPair 
pair = 
                                                                         new 
ChannelAttachmentPair(current.destination, current);
                                                                 
throttleDisabledQueue.addLast(pair);
-                                                                if(logDEBUG)
+                                                                if(logDebug)
                                                                         
Core.logger.log(this, "Moving new job "+
                                                                                       
                  pair+" onto delay queue", 
                                                                                       
                  Logger.DEBUG);
                                                                 continue;
                                                         }
                                                 }
-                                                if(logDEBUG)
+                                                if(logDebug)
                                                         Core.logger.log(this, 
"Copying job "+current+
                                                                                       
  " to queue", Logger.DEBUG);
                                                 waitingJobs.add(current);
                                         }
                                         long endTime = System.currentTimeMillis();
-                                        if(logDEBUG)
+                                        if(logDebug)
                                                 Core.logger.log(this, "Took 
"+(endTime-startTime)+
                                                                                 " 
millis copying queue ("+iter+")", 
                                                                                 
Logger.DEBUG);
@@ -439,15 +454,15 @@
                while (i.hasNext()) {
                        SendJob currentJob = (SendJob) i.next();
                        try {
-                               if(logDEBUG)
+                               if(logDebug)
                                        Core.logger.log(this, "Registering channel 
"+currentJob+
                                                                        " with 
selector", Logger.DEBUG);
                                currentJob.destination.register(sel, 
SelectionKey.OP_WRITE, currentJob);
-                               if(logDEBUG)
+                               if(logDebug)
                                        Core.logger.log(this, "Registered channel 
"+currentJob+
                                                                        " with 
selector", Logger.DEBUG);
                        }catch (ClosedChannelException e) {
-                               if(logDEBUG)
+                               if(logDebug)
                                        Core.logger.log(this, "Channel closed: 
"+currentJob+": "+e,
                                                                        e, 
Logger.DEBUG);
                                queueClose(((SocketChannel)currentJob.destination),
@@ -472,10 +487,6 @@
                }
         }
         
-       //check if any of the channels got closed;
-       //REDFLAG: the behavior of select() on channels that have
-       //been closed remotely is not tested.
-       //TEST PROPERLY AND IMPLEMENT CHECKS HERE!!!
        protected final boolean inspectChannels() {
                if(logDebug)
                        Core.logger.log(this, "inspectChannels()", Logger.DEBUG);
@@ -489,8 +500,7 @@
        //at this stage the selected set should contain only channels
        //that are ready to be written to and have something to be sent.
        protected final boolean processConnections() {
-               boolean logDEBUG = logDebug;
-               if(logDEBUG)
+               if(logDebug)
                        Core.logger.log(this, "processConnections()", Logger.DEBUG);
                boolean success = true;
                int throttledBytes = 0;
@@ -512,7 +522,6 @@
                try{
                        boolean noThrottled = (System.currentTimeMillis() < 
                                                                   
reregisterThrottledTime);
-                       boolean noMoreThrottled = false;
                        // Some of them may have been enableThrottle()d off thread
                while (i.hasNext()) {
                        boolean localSuccess = true;
@@ -538,12 +547,13 @@
                                if(currentJob.destination instanceof SocketChannel &&
                                   (!((SocketChannel)(currentJob.destination)).
                                        isConnected())) throw new IOException("not 
connected");
-                               if(currentJob.client.shouldThrottle()) {
-                                       if(noMoreThrottled) {
+                               if(bw != null && currentJob.client.shouldThrottle()) {
+                                       if(throttledBytes > 
minSendBytesPerThrottleCycle) {
                                                // Will get cancelled by 
throttleConnections
-                                               if(logDEBUG)
+                                               if(logDebug)
                                                        Core.logger.log(this, 
"Skipping (A) throttled "+
                                                                                       
 currentJob, Logger.DEBUG);
+                                               shortTimeout = true; // just in case 
the bytes are absorbed by the throttle
                                                continue;
                                        } else if(noThrottled) {
                                                // May not get cancelled by 
throttleConnections
@@ -571,13 +581,13 @@
                                        if(currentJob.data.remaining() > lim) {
                                                
currentJob.data.limit(currentJob.data.position() +
                                                                                       
   lim);
-                                               if(logDEBUG)
+                                               if(logDebug)
                                                        Core.logger.log(this, 
"Limited: "+currentJob.data+
                                                                                       
 " for "+currentJob.client+
                                                                                       
 ", limit was "+oldLimit, Logger.DEBUG);
                                        }
                                } else {
-                                       if(logDEBUG)
+                                       if(logDebug)
                                                Core.logger.log(this, "Did not limit, 
"+
                                                                                
currentJob.data.remaining()+"/"+
                                                                                
(bw==null?"(null)": Integer.
@@ -607,7 +617,7 @@
                                        if(currentJob.client.shouldThrottle()) {
                                                throttledBytes += (sent+OVERHEAD);
                                                totalWrittenThrottlableBytes += 
(sent+OVERHEAD);
-                                               if(logDEBUG) {
+                                               if(logDebug) {
                                                        Core.logger.log(this, "Should 
throttle "+
                                                                                       
 currentJob, Logger.DEBUG);
                                                        Core.logger.log(this, "Written 
"+sent+
@@ -632,11 +642,11 @@
                                                
if(currentJob.client.countAsThrottled()) {
                                                        pseudoThrottledBytes += 
(sent+OVERHEAD);
                                                        
totalWrittenPseudoThrottlableBytes += (sent+OVERHEAD);
-                                                       if(logDEBUG)
+                                                       if(logDebug)
                                                                Core.logger.log(this, 
"Pseudo-throttle "+
                                                                                       
         currentJob, Logger.DEBUG);
                                                } else {
-                                                       if(logDEBUG)
+                                                       if(logDebug)
                                                                Core.logger.log(this, 
"Should not throttle "+
                                                                                       
         currentJob, Logger.DEBUG);
                                                }
@@ -667,8 +677,16 @@
                                                                        curKey+") from 
uniqueness, now "+
                                                                        
uniqueness.size(), Logger.DEBUG);
                                        try {
-                                               
currentJob.client.jobDone(currentJob.data.position()-currentJob.position,
-                                                                                      
           localSuccess);
+                                               if(currentJob.data == null ||
+                                                  currentJob.client == null)
+                                                       Core.logger.log(this, 
"currentJob.data="+currentJob.data+
+                                                                                      
 ", currentJob.client="+currentJob.client,
+                                                                                      
 Logger.ERROR);
+                                               else {
+                                                       int delta = 
currentJob.data.position() -
+                                                               currentJob.position;
+                                                       
currentJob.client.jobDone(delta, localSuccess);
+                                               }
                                        } catch (Throwable t) {
                                                Core.logger.log(this, "Caught "+t+" 
notifying "+
                                                                                
currentJob.client+" for "+
@@ -685,17 +703,11 @@
                                                
currentJob.client.jobPartDone(currentJob.data.position()-currentJob.position);
                                }
                        }
-                       if(currentJob.client.shouldThrottle() && sent > 0) {
-                               // Throttle this one first
-                               if (logDebug)Core.logger.log(this, "Set 
noMoreThrottled because of "+
-                                                               currentJob, 
Logger.DEBUG);
-                               noMoreThrottled = true;
                        }
-               }
-               if(logDEBUG)
+               if(logDebug)
                        Core.logger.log(this, "Written "+bytesSent, Logger.DEBUG);
                if(throttledBytes != 0 || pseudoThrottledBytes != 0) {
-                       if(logDEBUG)
+                       if(logDebug)
                                Core.logger.log(this, "Written "+throttledBytes+
                                                                " bytes that should be 
throttled", 
                                                                Logger.DEBUG);
@@ -753,6 +765,7 @@
                c = (SocketChannel[])(uniqueness.keySet().toArray(c));
                for(int x=0;x<c.length;x++) {
                        SocketChannel chan = c[x];
+                       if(chan == null) continue; // race condition
                         if(!chan.isOpen()) {
                                 out.append("NOT OPEN: ").append(chan);
                                 Object o = uniqueness.get(chan);

Index: tcpAddress.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/tcpAddress.java,v
retrieving revision 1.4.4.5.2.5
retrieving revision 1.4.4.5.2.6
diff -u -w -r1.4.4.5.2.5 -r1.4.4.5.2.6
--- tcpAddress.java     29 Jul 2003 21:08:10 -0000      1.4.4.5.2.5
+++ tcpAddress.java     28 Oct 2003 20:20:48 -0000      1.4.4.5.2.6
@@ -23,12 +23,17 @@
     private InetAddress host;
     private String hostName = null;
     private int port;
+    private int hashCode; // has to be invariant with lookups etc
     public static boolean throttleAll = false;
 
     private String valname = "";
 
     private final tcpTransport transport;
 
+    public int hashCode() {
+       return hashCode;
+    }
+    
     public boolean equals(tcpAddress tcp) {
        if(tcp == null) return false;
        if(tcp.port != port) return false;
@@ -58,11 +63,15 @@
         this(transport);
         setPort(port);
         this.host = host;
-       if(host!=null)
+       if(host!=null) {
                valname = host.getHostAddress();
-       else
+        
+           hashCode = port ^ valname.hashCode();//host.hashCode() ^ 
valname.hashCode();
+       } else {
+           hashCode = port;
                valname = "";
     }
+    }
     
     /**
      * Creates an address from a host name or IP string and port number.
@@ -75,6 +84,7 @@
        host = null;
        //doDeferredLookup();
         valname = hostname;
+       hashCode = port ^ valname.hashCode();
     }
 
     /** 
@@ -96,6 +106,7 @@
             throw new BadAddressException(""+e);
         }
         setPort(Integer.parseInt(str.substring(colon + 1)));
+       hashCode = port ^ valname.hashCode();
     }
 
     public final void doDeferredLookup() throws UnknownHostException {

Index: tcpConnection.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/tcpConnection.java,v
retrieving revision 1.8.4.6.2.10
retrieving revision 1.8.4.6.2.11
diff -u -w -r1.8.4.6.2.10 -r1.8.4.6.2.11
--- tcpConnection.java  16 Aug 2003 17:30:08 -0000      1.8.4.6.2.10
+++ tcpConnection.java  28 Oct 2003 20:20:48 -0000      1.8.4.6.2.11
@@ -1,12 +1,15 @@
 package freenet.transport;
 
 import freenet.*;
+import freenet.node.Node; // for listenPort
+import freenet.node.Main; // for Main.timerGranularity... fixme?
 import freenet.support.io.*;
 import freenet.support.Logger;
 import java.io.*;
 import java.net.Socket;
 import java.net.InetAddress;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.nio.*;
 
 public final class tcpConnection extends Connection {
@@ -23,6 +26,7 @@
     private boolean shouldThrottle = false;
     private boolean shouldThrottleNow = false;
     private boolean instanceCounted = false; // if the constructor throws, WE STILL 
GET FINALIZED!
+    private static boolean logDEBUG;
     // We do not throttle in FnpLink because it slows down negotiations drastically
     // We do not directly throttle, ever, because of nio.
     // Hence throttling is implemented in *SelectorLoop, not here
@@ -40,11 +44,21 @@
     static WriteSelectorLoop wsl;
     private static Bandwidth ibw = null;
     private static Bandwidth obw = null;
+    private static boolean poolBuffers = true;
+    private static LinkedList bufferPool = new LinkedList();
+    private static boolean useDirectBuffers = false;
+    private static int BUFFER_SIZE = 16384;
     
     private static final int streamBufferSize() {
        return freenet.Core.streamBufferSize;
     }
     
+    public static int bufferPoolSize() {
+       synchronized(bufferPool) {
+           return bufferPool.size();
+       }
+    }
+    
     public static final HashMap socketConnectionMap = new HashMap();
     
     static synchronized public void setInputBandwidth(Bandwidth bw) {
@@ -63,17 +77,24 @@
        // Start NIO loops
        try {
            if(rsl == null) {
-               rsl = new ReadSelectorLoop(ibw);
+               if (ibw != null)
+                   rsl = new ReadSelectorLoop(ibw, Main.timerGranularity);
+               else
+                   rsl = new ReadSelectorLoop();
                Thread rslThread = new Thread(rsl, " read interface thread");
                rslThread.setDaemon(true);
-               rslThread.setPriority(Thread.MAX_PRIORITY);
+               //rslThread.setPriority(Thread.MAX_PRIORITY);
                rslThread.start(); // inactive until given registrations
            }
            if(wsl == null) {
-               wsl = new WriteSelectorLoop(obw);
+               if (obw != null)
+                   wsl = new WriteSelectorLoop(obw, Main.timerGranularity);
+               else
+                   wsl= new WriteSelectorLoop();
+               
                Thread wslThread = new Thread(wsl, " write interface thread");
                wslThread.setDaemon(true);
-               wslThread.setPriority(Thread.MAX_PRIORITY);
+               //wslThread.setPriority(Thread.MAX_PRIORITY);
                wslThread.start(); // inactive until given jobs
            }
        } catch (Throwable t) {
@@ -96,7 +117,7 @@
     public boolean countAsThrottled() { return shouldThrottle; };
     
     public void enableThrottle() { 
-       if(Core.logger.shouldLog(Logger.DEBUG))
+       if(Core.logger.shouldLog(Logger.DEBUG,this))
            Core.logger.log(this, "Enabling throttle for "+this, 
                            new Exception("debug"), Logger.DEBUG);
        shouldThrottleNow = shouldThrottle;
@@ -142,6 +163,35 @@
     
     public static boolean logBytes = false;
     
+    private final ByteBuffer getAccumulator() {
+       if(poolBuffers) {
+           synchronized(bufferPool) {
+               if(!bufferPool.isEmpty()) {
+                   accumulator = (ByteBuffer)(bufferPool.removeFirst());
+                   if(logDEBUG)
+                       Core.logger.log(this, "Reused buffer from pool: "+
+                                       accumulator, Logger.DEBUG);
+                   
+               } else {
+                   accumulator = useDirectBuffers ?
+                       ByteBuffer.allocateDirect(BUFFER_SIZE) :
+                       ByteBuffer.allocate(BUFFER_SIZE);
+                   if(logDEBUG)
+                       Core.logger.log(this, "Allocated new buffer because pool 
empty: "+
+                                       accumulator, Logger.DEBUG);
+               }
+           }
+       } else {
+           accumulator = useDirectBuffers ?
+               ByteBuffer.allocateDirect(BUFFER_SIZE) :
+               ByteBuffer.allocate(BUFFER_SIZE);
+           if(logDEBUG)
+               Core.logger.log(this, "Allocated new buffer because not pooling: "+
+                               accumulator, Logger.DEBUG);
+       }
+       return accumulator;
+    }
+    
     /**
      * Used to create an outbound connection.
      */
@@ -150,7 +200,7 @@
        throws ConnectFailedException {
         this(t);
        
-       boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+       logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
        if(logDEBUG)
            Core.logger.log(this, "tcpConnection (outbound)", 
                            new Exception("debug"), Logger.DEBUG);
@@ -173,7 +223,7 @@
                                                  
            /** NIO related stuff***/
            sock.getChannel().configureBlocking(false);
-           accumulator = ByteBuffer.allocate(16*1024); //FIXME:hardcoded
+           accumulator = getAccumulator();
            accumulator.limit(0).position(0);
            nioout = new NIOOutputStream(sock.getChannel(),this);
            nioin = new NIOInputStream(accumulator,sock.getChannel(),this);
@@ -274,7 +324,7 @@
     
     protected final void logInstances(String s) {
        synchronized(profLock) {
-               if(Core.logger.shouldLog(Logger.DEBUG))
+               if(Core.logger.shouldLog(Logger.DEBUG,this))
            Core.logger.log(this, s+" ("+this+") instances: "+instances+
                            ", openInstances: "+openInstances+
                            ", created: "+createdInstances+
@@ -292,7 +342,7 @@
     tcpConnection(tcpTransport t, Socket sock, int designator,
                  boolean dontThrottle, boolean doThrottle) throws IOException {
         this(t);
-       boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+       logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
        if(logDEBUG)
            Core.logger.log(this, "tcpConnection (inbound)", Logger.DEBUG);
         
@@ -301,7 +351,7 @@
        
        /** NIO related stuff***/
        sock.getChannel().configureBlocking(false);
-       accumulator = ByteBuffer.allocate(16*1024); //FIXME:hardcoded
+       accumulator = getAccumulator();
        accumulator.limit(0).position(0);
        nioout = new NIOOutputStream(sock.getChannel(),this);
        nioin = new NIOInputStream(accumulator,sock.getChannel(),this);
@@ -392,7 +442,8 @@
      * i.e. position = 0, limit = end of bytes available ("flipped")
      */
     public ByteBuffer getInputBuffer() {
-       return accumulator;
+       if(closed) return null;
+       else return accumulator;
     }
     
     Exception closeException;
@@ -403,7 +454,7 @@
     
     public final void close(boolean fromCloseThread) {
        closeException = new Exception("debug");
-       boolean logDEBUG =Core.logger.shouldLog(Logger.DEBUG);  
+       logDEBUG =Core.logger.shouldLog(Logger.DEBUG,this);  
        if(logDEBUG)
            Core.logger.log(this, "Closing("+fromCloseThread+
                            ") tcpConnection "+this, 
@@ -484,13 +535,28 @@
        if(finalized) return;
        finalized = true;
        logInstances("Finalizing");
-       if(!closed) Core.logger.log(this, "finalized without being closed!"+this, 
Logger.NORMAL);
+       if(!closed) Core.logger.log(this, "finalized without being closed!"+this, 
+                                   Logger.NORMAL);
+       // Accumulator will not be reused after closure
+       if(poolBuffers) {
+           if(accumulator != null) {
+               synchronized(bufferPool) {
+                   bufferPool.addLast(accumulator);
+                   accumulator = null;
+               }
+           } else {
+               if(logDEBUG)
+                   Core.logger.log(this, "Not repooling accumulator because is null: 
"+
+                                   this, Logger.DEBUG);
+           }
+       }
        try {
            close(true);
        } catch (Throwable t) {
            Core.logger.log(this, "Caught "+t+" closing "+this+" in finalize()", t, 
Logger.NORMAL);
        }
-       if(!reallyClosed) Core.logger.log(this, "finalized without being 
reallyClosed!: "+this, Logger.NORMAL);
+       if(!reallyClosed) Core.logger.log(this, "finalized without being 
reallyClosed!: "+this,
+                                         Logger.NORMAL);
        //profiling
        //WARNING:remove before release
        synchronized(profLock) {
@@ -503,11 +569,13 @@
     }
     
     public final InputStream getIn() {
-        return in;
+       if(closed) return null;
+        else return in;
     }
     
     public final NIOInputStream getUnderlyingIn() {
-       return nioin;
+       if(closed) return null;
+       else return nioin;
     }
     
     public final OutputStream getOut() {
@@ -520,7 +588,7 @@
             sock.setSoTimeout(timeout);
         else
             throw new IOException("Already closed "+this);
-       if(Core.logger.shouldLog(Logger.DEBUG))
+       if(Core.logger.shouldLog(Logger.DEBUG,this))
            Core.logger.log(this, "Set SO_TIMEOUT to "+timeout+" for "+this,
                            Logger.DEBUG);
     }
@@ -580,10 +648,18 @@
            sb.append("CLOSED"); //you won't believe it till you see it with
        } else if (sock!=null) {
            InetAddress addr = sock.getInetAddress();
+           int localPort = sock.getLocalPort();
+           if (localPort != Node.listenPort) {
+               sb.append(localPort);
+               sb.append(">");
+           }
            if(addr != null) //this is becoming increasingly common --zab
                sb.append(addr.getHostAddress());
            sb.append(":");
            sb.append(sock.getPort());
+           if (localPort == Node.listenPort) {
+               sb.append(">local");
+           }
        } else sb.append("null");
        sb.append(","+super.toString());
         return sb.toString();

Index: tcpTransport.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/transport/tcpTransport.java,v
retrieving revision 1.1.1.1.4.4.2.2
retrieving revision 1.1.1.1.4.4.2.3
diff -u -w -r1.1.1.1.4.4.2.2 -r1.1.1.1.4.4.2.3
--- tcpTransport.java   15 Jul 2003 14:29:40 -0000      1.1.1.1.4.4.2.2
+++ tcpTransport.java   28 Oct 2003 20:20:48 -0000      1.1.1.1.4.4.2.3
@@ -56,7 +56,7 @@
 
     public static boolean checkAddress(int[] i) {
        // ip address (IPV6 is not supported by this transport)
-       boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+       boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG,tcpTransport.class);
        if(logDEBUG)
            Core.logger.log(tcpTransport.class, "Checking "+i[0]+"."+i[1]+"."+i[2]+
                            "."+i[3], Logger.DEBUG);
@@ -101,7 +101,7 @@
     }
     
     public boolean checkAddress(String s, boolean noPort) {
-       boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG);
+       boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
        if(logDEBUG)
            Core.logger.log(this, "Checking "+s, Logger.DEBUG);
        String a = s;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to