Author: toad
Date: 2007-06-27 21:00:20 +0000 (Wed, 27 Jun 2007)
New Revision: 13784

Added:
   trunk/freenet/src/freenet/io/comm/MessageCore.java
   trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java
   trunk/freenet/src/freenet/io/comm/SocketHandler.java
   trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
Removed:
   trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
Modified:
   trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
   trunk/freenet/src/freenet/io/comm/MessageFilter.java
   trunk/freenet/src/freenet/io/comm/PeerContext.java
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeIPDetector.java
   trunk/freenet/src/freenet/node/NodeStats.java
   trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java
   trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java
Log:
Refactor UdpSocketManager into MessageCore and UdpSocketHandler (one per 
socket/port, we will have multiple sockets/ports with opennet).
Useful for opennet, also useful for alternate transports later on.
Untested.

Modified: trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java    2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java    2007-06-27 
21:00:20 UTC (rev 13784)
@@ -10,7 +10,7 @@
 import java.util.Iterator;

 import freenet.client.HighLevelSimpleClient;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.MessageCore;
 import freenet.l10n.L10n;
 import freenet.node.Node;
 import freenet.node.NodeClientCore;
@@ -27,7 +27,7 @@

        private NodeClientCore core;

-       private UdpSocketManager usm;
+       private MessageCore usm;

        protected N2NTMToadlet(Node n, NodeClientCore core,
                        HighLevelSimpleClient client) {

Copied: trunk/freenet/src/freenet/io/comm/MessageCore.java (from rev 13775, 
trunk/freenet/src/freenet/io/comm/UdpSocketManager.java)
===================================================================
--- trunk/freenet/src/freenet/io/comm/MessageCore.java                          
(rev 0)
+++ trunk/freenet/src/freenet/io/comm/MessageCore.java  2007-06-27 21:00:20 UTC 
(rev 13784)
@@ -0,0 +1,494 @@
+/*
+ * Dijjer - A Peer to Peer HTTP Cache
+ * Copyright (C) 2004,2005 Change.Tv, Inc
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+package freenet.io.comm;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Vector;
+
+import freenet.node.PeerNode;
+import freenet.node.Ticker;
+import freenet.support.Logger;
+import freenet.support.TimeUtil;
+
+public class MessageCore {
+
+       public static final String VERSION = "$Id: MessageCore.java,v 1.22 
2005/08/25 17:28:19 amphibian Exp $";
+       private static boolean logMINOR; 
+       private Dispatcher _dispatcher;
+       /** _filters serves as lock for both */
+       private final LinkedList _filters = new LinkedList();
+       private final LinkedList _unclaimed = new LinkedList();
+       private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
+       private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME = 
60*60*1000;  // 1 hour
+       // Every second, remove all timed out filters
+       private static final int FILTER_REMOVE_TIME = 1000;
+
+       public MessageCore() {
+               _timedOutFilters = new Vector(32);
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+       }
+
+       /**
+        * Decode a packet from data and a peer.
+        * Can be called by IncomingPacketFilter's.
+     * @param data
+     * @param offset
+     * @param length
+     * @param peer
+     */
+    public Message decodeSingleMessage(byte[] data, int offset, int length, 
PeerContext peer, int overhead) {
+        try {
+            return Message.decodeMessageFromPacket(data, offset, length, peer, 
overhead);
+        } catch (Throwable t) {
+            Logger.error(this, "Could not decode packet: "+t, t);
+            return null;
+        }
+    }
+
+    /** Only used by removeTimedOutFilters() - if future code uses this 
elsewhere, we need to
+     * reconsider its locking. */
+    private final Vector _timedOutFilters;
+    
+    public void start(final Ticker ticker) {
+       ticker.queueTimedJob(new Runnable() {
+
+                       public void run() {
+                               try {
+                                       removeTimedOutFilters();
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Failed to remove 
timed out filters: "+t, t);
+                               } finally {
+                                       ticker.queueTimedJob(this, 
FILTER_REMOVE_TIME);
+                               }
+                               // TODO Auto-generated method stub
+                               
+                       }
+               
+       }, FILTER_REMOVE_TIME);
+    }
+    
+    /**
+     * Remove timed out filters.
+     */
+       void removeTimedOutFilters() {
+               long tStart = System.currentTimeMillis();
+               synchronized (_filters) {
+                       for (ListIterator i = _filters.listIterator(); 
i.hasNext();) {
+                               MessageFilter f = (MessageFilter) i.next();
+                               if (f.timedOut()) {
+                                       i.remove();
+                                       _timedOutFilters.add(f);
+                               } else { // Because _filters are in order of 
timeout, we
+                                       // can abort the iteration as soon as 
we find one that
+                                       // doesn't timeout
+                                       break;
+                               }
+                       }
+               }
+               
+               for(int i=0;i<_timedOutFilters.size();i++) {
+                       MessageFilter f = (MessageFilter) 
_timedOutFilters.get(i);
+                       f.setMessage(null);
+                       f.onTimedOut();
+               }
+               _timedOutFilters.clear();
+               
+               long tEnd = System.currentTimeMillis();
+               if(tEnd - tStart > 50) {
+                       if(tEnd - tStart > 3000)
+                               Logger.error(this, "removeTimedOutFilters took 
"+(tEnd-tStart)+"ms");
+                       else
+                               if(logMINOR) Logger.minor(this, 
"removeTimedOutFilters took "+(tEnd-tStart)+"ms");
+               }
+       }
+
+       /**
+        * Dispatch a message to a waiting filter, or feed it to the
+        * Dispatcher if none are found.
+        * @param m The Message to dispatch.
+        */
+       public void checkFilters(Message m, PacketSocketHandler from) {
+               long tStart = System.currentTimeMillis();
+               if(logMINOR) Logger.minor(this, "checkFilters: "+m+" from 
"+m.getSource());
+               if ((m.getSource()) instanceof PeerNode)
+               {
+                       
((PeerNode)m.getSource()).addToLocalNodeReceivedMessagesFromStatistic(m);
+               }
+               boolean matched = false;
+               if ((!(m.getSpec().equals(DMT.packetTransmit))) && logMINOR) {
+                       if ((m.getSpec().equals(DMT.ping) || 
m.getSpec().equals(DMT.pong)) && Logger.shouldLog(Logger.DEBUG, this)) {
+                               Logger.debug(this, "" + 
(System.currentTimeMillis() % 60000) + ' ' + from + " <- "
+                                               + m.getSource() + " : " + m);
+                       } else {
+                               if(logMINOR) Logger.minor(this, "" + 
(System.currentTimeMillis() % 60000) + ' ' + from + " <- "
+                                               + m.getSource() + " : " + m);
+                       }
+               }
+               MessageFilter match = null;
+               synchronized (_filters) {
+                       for (ListIterator i = _filters.listIterator(); 
i.hasNext();) {
+                               MessageFilter f = (MessageFilter) i.next();
+                               if (f.match(m)) {
+                                       matched = true;
+                                       i.remove();
+                                       match = f;
+                                       if(logMINOR) Logger.minor(this, 
"Matched: "+f);
+                                       break; // Only one match permitted per 
message
+                               }
+                       }
+               }
+               if(match != null) {
+                       match.setMessage(m);
+                       match.onMatched();
+               }
+               // Feed unmatched messages to the dispatcher
+               if ((!matched) && (_dispatcher != null)) {
+                   try {
+                       if(logMINOR) Logger.minor(this, "Feeding to dispatcher: 
"+m);
+                       matched = _dispatcher.handleMessage(m);
+                   } catch (Throwable t) {
+                       Logger.error(this, "Dispatcher threw "+t, t);
+                   }
+               }
+               // Keep the last few _unclaimed messages around in case the 
intended receiver isn't receiving yet
+               if (!matched) {
+                       if(logMINOR) Logger.minor(this, "Unclaimed: "+m);
+                   /** Check filters and then add to _unmatched is ATOMIC
+                    * It has to be atomic, because otherwise we can get a
+                    * race condition that results in timeouts on MFs.
+                    * 
+                    * Specifically:
+                    * - Thread A receives packet
+                    * - Thread A checks filters. It doesn't match any.
+                    * - Thread A feeds to Dispatcher.
+                    * - Thread B creates filter.
+                    * - Thread B checks _unmatched.
+                    * - Thread B adds filter.
+                    * - Thread B sleeps.
+                    * - Thread A returns from Dispatcher. Which didn't match.
+                    * - Thread A adds to _unmatched.
+                    * 
+                    * OOPS!
+                    * The only way to fix this is to have checking the
+                    * filters and unmatched be a single atomic operation.
+                    * Another race is possible if we merely recheck the
+                    * filters after we return from dispatcher, for example.
+                    */
+                       synchronized (_filters) {
+                               if(logMINOR) Logger.minor(this, "Rechecking 
filters and adding message");
+                               for (ListIterator i = _filters.listIterator(); 
i.hasNext();) {
+                                       MessageFilter f = (MessageFilter) 
i.next();
+                                       if (f.match(m)) {
+                                               matched = true;
+                                               match = f;
+                                               i.remove();
+                                               if(logMINOR) Logger.minor(this, 
"Matched: "+f);
+                                               break; // Only one match 
permitted per message
+                                       }
+                               }
+                               if(!matched) {
+                                   while (_unclaimed.size() > 
MAX_UNMATCHED_FIFO_SIZE) {
+                                       Message removed = 
(Message)_unclaimed.removeFirst();
+                                       long messageLifeTime = 
System.currentTimeMillis() - removed.localInstantiationTime;
+                                       if ((removed.getSource()) instanceof 
PeerNode) {
+                                           Logger.normal(this, "Dropping 
unclaimed from "+removed.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
+                                       } else {
+                                           Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" 
(quantity)"+": "+removed);
+                                       }
+                                   }
+                                   _unclaimed.addLast(m);
+                                   if(logMINOR) Logger.minor(this, "Done");
+                               }
+                       }
+                       if(match != null) {
+                               match.setMessage(m);
+                               match.onMatched();
+                       }
+               }
+               long tEnd = System.currentTimeMillis();
+               if(tEnd - tStart > 50) {
+                       if(tEnd - tStart > 3000)
+                               Logger.error(this, "checkFilters took 
"+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for 
matched: "+matched);
+                       else
+                               if(logMINOR) Logger.minor(this, "checkFilters 
took "+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for 
matched: "+matched);
+               }
+       }
+       
+       /** IncomingPacketFilter should call this when a node is disconnected. 
*/
+       public void onDisconnect(PeerContext ctx) {
+               Vector droppedFilters = null; // rare operation, we can waste 
objects for better locking
+           synchronized(_filters) {
+                       ListIterator i = _filters.listIterator();
+                       while (i.hasNext()) {
+                           MessageFilter f = (MessageFilter) i.next();
+                           if(f.matchesDroppedConnection(ctx)) {
+                               if(droppedFilters == null)
+                                       droppedFilters = new Vector();
+                               droppedFilters.add(f);
+                               i.remove();
+                           }
+                       }
+           }
+           if(droppedFilters != null) {
+               for(int i=0;i<droppedFilters.size();i++) {
+                       MessageFilter mf = (MessageFilter) 
droppedFilters.get(i);
+                       mf.onDroppedConnection(ctx);
+               }
+           }
+       }
+       
+       /** IncomingPacketFilter should call this when a node connects with a 
new boot ID */
+       public void onRestart(PeerContext ctx) {
+               Vector droppedFilters = null; // rare operation, we can waste 
objects for better locking
+           synchronized(_filters) {
+                       ListIterator i = _filters.listIterator();
+                       while (i.hasNext()) {
+                           MessageFilter f = (MessageFilter) i.next();
+                           if(f.matchesRestartedConnection(ctx)) {
+                               if(droppedFilters == null)
+                                       droppedFilters = new Vector();
+                               droppedFilters.add(f);
+                               i.remove();
+                           }
+                       }
+           }
+           if(droppedFilters != null) {
+               for(int i=0;i<droppedFilters.size();i++) {
+                       MessageFilter mf = (MessageFilter) 
droppedFilters.get(i);
+                       mf.onRestartedConnection(ctx);
+               }
+           }
+       }
+
+       public void addAsyncFilter(MessageFilter filter, 
AsyncMessageFilterCallback callback) throws DisconnectedException {
+               filter.setAsyncCallback(callback);
+               filter.onStartWaiting();
+               boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+               if(logDEBUG) Logger.debug(this, "Adding async filter "+filter+" 
for "+callback);
+               Message ret = null;
+               if(filter._source != null && (!filter._source.isConnected()) &&
+                       filter.matchesDroppedConnection(filter._source))
+                   throw new DisconnectedException();
+               // Check to see whether the filter matches any of the recently 
_unclaimed messages
+               // Drop any _unclaimed messages that the filter doesn't match 
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
+               long now = System.currentTimeMillis();
+               long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
+               long messageLifeTime = 0;
+               synchronized (_filters) {
+                       if(logMINOR) Logger.minor(this, "Checking _unclaimed");
+                       for (ListIterator i = _unclaimed.listIterator(); 
i.hasNext();) {
+                               Message m = (Message) i.next();
+                               if (filter.match(m)) {
+                                       i.remove();
+                                       ret = m;
+                                       if(logMINOR) Logger.debug(this, 
"Matching from _unclaimed");
+                                       break;
+                               } else if (m.localInstantiationTime < 
messageDropTime) {
+                                       i.remove();
+                                       messageLifeTime = now - 
m.localInstantiationTime;
+                                       if ((m.getSource()) instanceof 
PeerNode) {
+                                               Logger.normal(this, "Dropping 
unclaimed from "+m.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
+                                       } else {
+                                               Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": 
"+m);
+                                       }
+                               }
+                       }
+                       if (ret == null) {
+                               if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
+                           // Insert filter into filter list in order of 
timeout
+                               ListIterator i = _filters.listIterator();
+                               while (true) {
+                                       if (!i.hasNext()) {
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added at end");
+                                               break;
+                                       }
+                                       MessageFilter mf = (MessageFilter) 
i.next();
+                                       if (mf.getTimeout() > 
filter.getTimeout()) {
+                                               i.previous();
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
+                                               break;
+                                       }
+                               }
+                       }
+               }
+               if(ret != null) {
+                       filter.onMatched();
+                       filter.clearMatched();
+               }
+       }
+
+       /**
+        * Wait for a filter to trigger, or timeout. Blocks until either the 
trigger is activated, or it times
+        * out, or the peer is disconnected.
+        * @param filter The filter to wait for.
+        * @param ctr Byte counter to add bytes from the message to.
+        * @return Either a message, or null if the filter timed out.
+        * @throws DisconnectedException If the single peer being waited for 
disconnects.
+        */
+       public Message waitFor(MessageFilter filter, ByteCounter ctr) throws 
DisconnectedException {
+               boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
+               if(logDEBUG) Logger.debug(this, "Waiting for "+filter);
+               long startTime = System.currentTimeMillis();
+               filter.onStartWaiting();
+               Message ret = null;
+               if(filter._source != null && (!filter._source.isConnected()) &&
+                       filter.matchesDroppedConnection(filter._source))
+                   throw new DisconnectedException();
+               // Check to see whether the filter matches any of the recently 
_unclaimed messages
+               // Drop any _unclaimed messages that the filter doesn't match 
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
+               long now = System.currentTimeMillis();
+               long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
+               long messageLifeTime = 0;
+               synchronized (_filters) {
+                       if(logMINOR) Logger.minor(this, "Checking _unclaimed");
+                       for (ListIterator i = _unclaimed.listIterator(); 
i.hasNext();) {
+                               Message m = (Message) i.next();
+                               if (filter.match(m)) {
+                                       i.remove();
+                                       ret = m;
+                                       if(logMINOR) Logger.debug(this, 
"Matching from _unclaimed");
+                                       break;
+                               } else if (m.localInstantiationTime < 
messageDropTime) {
+                                       i.remove();
+                                       messageLifeTime = now - 
m.localInstantiationTime;
+                                       if ((m.getSource()) instanceof 
PeerNode) {
+                                               Logger.normal(this, "Dropping 
unclaimed from "+m.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
+                                       } else {
+                                               Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": 
"+m);
+                                       }
+                               }
+                       }
+                       if (ret == null) {
+                               if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
+                           // Insert filter into filter list in order of 
timeout
+                               ListIterator i = _filters.listIterator();
+                               while (true) {
+                                       if (!i.hasNext()) {
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added at end");
+                                               break;
+                                       }
+                                       MessageFilter mf = (MessageFilter) 
i.next();
+                                       if (mf.getTimeout() > 
filter.getTimeout()) {
+                                               i.previous();
+                                               i.add(filter);
+                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
+                                               break;
+                                       }
+                               }
+                       }
+               }
+               long tEnd = System.currentTimeMillis();
+               if(tEnd - now > 50) {
+                       if(tEnd - now > 3000)
+                               Logger.error(this, "waitFor _unclaimed 
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" 
for ret of "+ret);
+                       else
+                               if(logMINOR) Logger.minor(this, "waitFor 
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of 
"+_unclaimed.size()+" for ret of "+ret);
+               }
+               // Unlock to wait on filter
+               // Waiting on the filter won't release the outer lock
+               // So we have to release it here
+               if(ret == null) {       
+                       if(logMINOR) Logger.minor(this, "Waiting...");
+                       synchronized (filter) {
+                               try {
+                                       // Precaution against filter getting 
matched between being added to _filters and
+                                       // here - bug discovered by Mason
+                                   boolean fmatched = false;
+                                   while(!(fmatched = (filter.matched() || 
(filter.droppedConnection() != null)))) {
+                                       long wait = 
filter.getTimeout()-System.currentTimeMillis();
+                                       if(wait > 0)
+                                           filter.wait(wait);
+                                       else break;
+                                       }
+                                   if(filter.droppedConnection() != null)
+                                       throw new DisconnectedException();
+                                   if(logMINOR) Logger.minor(this, "Matched: 
"+fmatched);
+                               } catch (InterruptedException e) {
+                               }
+                               ret = filter.getMessage();
+                               filter.clearMatched();
+                       }
+                       if(logDEBUG) Logger.debug(this, "Returning "+ret+" from 
"+filter);
+               } else {
+                       // Matched an unclaimed packet
+                       filter.onMatched();
+                       filter.clearMatched();
+               }
+               // Probably get rid...
+//             if (Dijjer.getDijjer().getDumpMessageWaitTimes() != null) {
+//                     
Dijjer.getDijjer().getDumpMessageWaitTimes().println(filter.toString() + "\t" + 
filter.getInitialTimeout() + "\t"
+//                                     + (System.currentTimeMillis() - 
startTime));
+//                     Dijjer.getDijjer().getDumpMessageWaitTimes().flush();
+//             }
+               long endTime = System.currentTimeMillis();
+               if(logDEBUG) Logger.debug(this, "Returning in 
"+(endTime-startTime)+"ms");
+               if((ctr != null) && (ret != null))
+                       ctr.receivedBytes(ret._receivedByteCount);
+               return ret;
+       }
+
+       /**
+        * Send a Message to a PeerContext.
+        * @throws NotConnectedException If we are not currently connected to 
the node.
+        */
+       public void send(PeerContext destination, Message m, ByteCounter ctr) 
throws NotConnectedException {
+           if(m.getSpec().isInternalOnly()) {
+               Logger.error(this, "Trying to send internal-only message "+m+" 
of spec "+m.getSpec(), new Exception("debug"));
+               return;
+           }
+               destination.sendAsync(m, null, 0, ctr);
+       }
+
+       public void setDispatcher(Dispatcher d) {
+               _dispatcher = d;
+       }
+
+       /**
+        * @return the number of received messages that are currently unclaimed
+        */
+       public int getUnclaimedFIFOSize() {
+               synchronized (_filters){
+                       return _unclaimed.size();
+               }
+       }
+       
+       public Map getUnclaimedFIFOMessageCounts() {
+               Map messageCounts = new HashMap();
+               synchronized(_filters) {
+                       for (ListIterator i = _unclaimed.listIterator(); 
i.hasNext();) {
+                               Message m = (Message) i.next();
+                               String messageName = m.getSpec().getName();
+                               Integer messageCount = (Integer) 
messageCounts.get(messageName);
+                               if (messageCount == null) {
+                                       messageCounts.put(messageName, new 
Integer(1) );
+                               } else {
+                                       messageCount = new 
Integer(messageCount.intValue() + 1);
+                                       messageCounts.put(messageName, 
messageCount );
+                               }
+                       }
+               }
+               return messageCounts;
+       }
+}

Modified: trunk/freenet/src/freenet/io/comm/MessageFilter.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/MessageFilter.java        2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/comm/MessageFilter.java        2007-06-27 
21:00:20 UTC (rev 13784)
@@ -166,6 +166,7 @@
        }

        public boolean match(Message m) {
+               if(timedOut()) return false;
                if ((_or != null) && (_or.match(m))) {
                        _matched = true;
                        return true;

Added: trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java                  
        (rev 0)
+++ trunk/freenet/src/freenet/io/comm/PacketSocketHandler.java  2007-06-27 
21:00:20 UTC (rev 13784)
@@ -0,0 +1,32 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.io.comm;
+
+import freenet.io.comm.Peer.LocalAddressException;
+
+/**
+ * Base class for UdpSocketHandler and any other datagram-based transports.
+ */
+public interface PacketSocketHandler extends SocketHandler {
+
+       /** The maximum size of a packet, not including transport layer headers 
*/
+       int getMaxPacketSize();
+
+       /**
+        * Send a block of encoded bytes to a peer. This is called by
+        * send, and by IncomingPacketFilter.processOutgoing(..).
+     * @param blockToSend The data block to send.
+     * @param destination The peer to send it to.
+     */
+    public void sendPacket(byte[] blockToSend, Peer destination, boolean 
allowLocalAddresses) throws LocalAddressException;
+
+    /**
+     * Get the size of the transport layer headers, for byte accounting 
purposes.
+     */
+       int getHeadersLength();
+
+       /** Set the decryption filter to which incoming packets will be fed */
+       public void setLowLevelFilter(IncomingPacketFilter f);
+
+}

Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java  2007-06-27 17:55:09 UTC 
(rev 13783)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java  2007-06-27 21:00:20 UTC 
(rev 13784)
@@ -4,6 +4,7 @@
 package freenet.io.comm;

 import freenet.io.xfer.PacketThrottle;
+import freenet.node.OutgoingPacketMangler;

 /**
  * @author amphibian
@@ -36,4 +37,10 @@
        /** Get the PacketThrottle for the node's current address for the 
standard packet size (if the 
         * address changes then we get a new throttle). */ 
        public PacketThrottle getThrottle();
+
+       /** Get the SocketHandler which handles incoming packets from this node 
*/
+       SocketHandler getSocketHandler();
+       
+       /** Get the OutgoingPacketMangler which encrypts outgoing packets to 
this node */
+       OutgoingPacketMangler getOutgoingMangler();
 }

Added: trunk/freenet/src/freenet/io/comm/SocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/SocketHandler.java                        
        (rev 0)
+++ trunk/freenet/src/freenet/io/comm/SocketHandler.java        2007-06-27 
21:00:20 UTC (rev 13784)
@@ -0,0 +1,14 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.io.comm;
+
+/**
+ * Base class for all transports. We have a single object of this type for 
both incoming and
+ * outgoing packets, but multiple instances for different instances of the 
transport e.g. on
+ * different ports, with different crypto backends etc.
+ * @author toad
+ */
+public interface SocketHandler {
+
+}

Added: trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java                     
        (rev 0)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java     2007-06-27 
21:00:20 UTC (rev 13784)
@@ -0,0 +1,367 @@
+package freenet.io.comm;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.Random;
+
+import org.tanukisoftware.wrapper.WrapperManager;
+
+import freenet.io.comm.Peer.LocalAddressException;
+import freenet.node.LoggingConfigHandler;
+import freenet.node.Node;
+import freenet.support.FileLoggerHook;
+import freenet.support.Logger;
+import freenet.support.OOMHandler;
+
+public class UdpSocketHandler extends Thread implements PacketSocketHandler {
+
+       private final DatagramSocket _sock;
+       private final InetAddress _bindTo;
+       private IncomingPacketFilter lowLevelFilter;
+       /** RNG for debugging, used with _dropProbability.
+        * NOT CRYPTO SAFE. DO NOT USE FOR THINGS THAT NEED CRYPTO SAFE RNG!
+        */
+       private Random dropRandom;
+       /** If >0, 1 in _dropProbability chance of dropping a packet; for 
debugging */
+       private int _dropProbability;
+       // Icky layer violation, but we need to know the Node to work around 
the EvilJVMBug.
+       private final Node node;
+       private static boolean logMINOR; 
+       private volatile int lastTimeInSeconds;
+       private boolean _isDone;
+       
+       public UdpSocketHandler(int listenPort, InetAddress bindto, Node node) 
throws SocketException {
+               super("MessageCore packet receiver thread on port " + 
listenPort);
+               this.node = node;
+               _bindTo = bindto;
+                   // Keep the Updater code in, just commented out, for now
+                   // We may want to be able to do on-line updates.
+//                     if (Updater.hasResource()) {
+//                             _sock = (DatagramSocket) Updater.getResource();
+//                     } else {
+               _sock = new DatagramSocket(listenPort, bindto);
+               int sz = _sock.getReceiveBufferSize();
+               if(sz < 32768)
+                       _sock.setReceiveBufferSize(32768);
+               try {
+                       // We make it timeout every 100ms so that we can check 
for
+                       // _filters which have timed out, this
+                       // is ugly but our only option without resorting to 
java.nio
+                       // because there is no way to forcefully
+                       // interrupt a socket wait operation
+                       _sock.setSoTimeout(100);
+               } catch (SocketException e) {
+                       throw new RuntimeException(e);
+               }
+//                     }
+               // Only used for debugging, no need to seed from Yarrow
+               dropRandom = new Random();
+               logMINOR = Logger.shouldLog(Logger.MINOR, this);
+       }
+
+       /** Must be called, or we will NPE in run() */
+       public void setLowLevelFilter(IncomingPacketFilter f) {
+           lowLevelFilter = f;
+       }
+       
+       public InetAddress getBindTo() {
+               return _bindTo;
+       }
+       
+       public void run() { // Listen for packets
+               try {
+                       runLoop();
+               } catch (Throwable t) {
+                       // Impossible? It keeps on exiting. We get the below,
+                       // but not this...
+                       try {
+                               System.err.print(t.getClass().getName());
+                               System.err.println();
+                       } catch (Throwable tt) {};
+                       try {
+                               System.err.print(t.getMessage());
+                               System.err.println();
+                       } catch (Throwable tt) {};
+                       try {
+                               System.gc();
+                               System.runFinalization();
+                               System.gc();
+                               System.runFinalization();
+                       } catch (Throwable tt) {}
+                       try {
+                               Runtime r = Runtime.getRuntime();
+                               System.err.print(r.freeMemory());
+                               System.err.println();
+                               System.err.print(r.totalMemory());
+                               System.err.println();
+                       } catch (Throwable tt) {};
+                       try {
+                               t.printStackTrace();
+                       } catch (Throwable tt) {};
+               } finally {
+                       System.err.println("run() exiting");
+                       Logger.error(this, "run() exiting");
+                       synchronized (this) {
+                               _isDone = true;
+                               notifyAll();
+                       }
+               }
+       }
+
+       private void runLoop() {
+               byte[] buf = new byte[MAX_RECEIVE_SIZE];
+               DatagramPacket packet = new DatagramPacket(buf, buf.length);
+               while (/*_active*/true) {
+                       try {
+                               lastTimeInSeconds = (int) 
(System.currentTimeMillis() / 1000);
+                               realRun(packet);
+            } catch (OutOfMemoryError e) {
+                               OOMHandler.handleOOM(e);
+                               System.err.println("Will retry above failed 
operation...");
+                       } catch (Throwable t) {
+                               System.err.println("Caught "+t);
+                               t.printStackTrace(System.err);
+                               Logger.error(this, "Caught " + t, t);
+                       }
+               }
+       }
+       
+       private void realRun(DatagramPacket packet) {
+               // Single receiving thread
+               boolean gotPacket = getPacket(packet);
+               if (gotPacket) {
+                       long startTime = System.currentTimeMillis();
+                       Peer peer = new Peer(packet.getAddress(), 
packet.getPort());
+                       long endTime = System.currentTimeMillis();
+                       if(endTime - startTime > 50) {
+                               if(endTime-startTime > 3000)
+                                       Logger.error(this, "packet creation 
took "+(endTime-startTime)+"ms");
+                               else
+                                       if(logMINOR) Logger.minor(this, "packet 
creation took "+(endTime-startTime)+"ms");
+                       }
+                       byte[] data = packet.getData();
+                       int offset = packet.getOffset();
+                       int length = packet.getLength();
+                       try {
+                               if(logMINOR) Logger.minor(this, "Processing 
packet of length "+length+" from "+peer);
+                               startTime = System.currentTimeMillis();
+                               lowLevelFilter.process(data, offset, length, 
peer);
+                               endTime = System.currentTimeMillis();
+                               if(endTime - startTime > 50) {
+                                       if(endTime-startTime > 3000)
+                                               Logger.error(this, "processing 
packet took "+(endTime-startTime)+"ms");
+                                       else
+                                               if(logMINOR) Logger.minor(this, 
"processing packet took "+(endTime-startTime)+"ms");
+                               }
+                               if(logMINOR) Logger.minor(this,
+                                               "Successfully handled packet 
length " + length);
+                       } catch (Throwable t) {
+                               Logger.error(this, "Caught " + t + " from "
+                                               + lowLevelFilter, t);
+                       }
+               } else if(logMINOR) Logger.minor(this, "Null packet");
+       }
+       
+    // FIXME necessary to deal with bugs around build 1000; arguably necessary 
to deal with large node names in connection setup
+    // Revert to 1500?
+    private static final int MAX_RECEIVE_SIZE = 2048;
+    
+    private boolean getPacket(DatagramPacket packet) {
+               try {
+                       _sock.receive(packet);
+                       // TODO: keep?
+                       IOStatisticCollector.addInfo(packet.getAddress() + ":" 
+ packet.getPort(),
+                                       packet.getLength(), 0);
+               } catch (SocketTimeoutException e1) {
+                       return false;
+               } catch (IOException e2) {
+                       throw new RuntimeException(e2);
+               }
+               if(logMINOR) Logger.minor(this, "Received packet");
+               return true;
+       }
+
+       /**
+        * Send a block of encoded bytes to a peer. This is called by
+        * send, and by IncomingPacketFilter.processOutgoing(..).
+     * @param blockToSend The data block to send.
+     * @param destination The peer to send it to.
+     */
+    public void sendPacket(byte[] blockToSend, Peer destination, boolean 
allowLocalAddresses) throws LocalAddressException {
+       assert(blockToSend != null);
+               // there should be no DNS needed here, but go ahead if we can, 
but complain doing it
+               if( destination.getAddress(false, allowLocalAddresses) == null 
) {
+                       Logger.error(this, "Tried sending to destination 
without pre-looked up IP address(needs a real Peer.getHostname()): null:" + 
destination.getPort(), new Exception("error"));
+                       if( destination.getAddress(true, allowLocalAddresses) 
== null ) {
+                               Logger.error(this, "Tried sending to bad 
destination address: null:" + destination.getPort(), new Exception("error"));
+                               return;
+                       }
+               }
+               if (_dropProbability > 0) {
+                       if (dropRandom.nextInt() % _dropProbability == 0) {
+                               if(logMINOR) Logger.minor(this, "DROPPED: " + 
_sock.getLocalPort() + " -> " + destination.getPort());
+                               return;
+                       }
+               }
+               InetAddress address = destination.getAddress(false, 
allowLocalAddresses);
+               assert(address != null);
+               int port = destination.getPort();
+               DatagramPacket packet = new DatagramPacket(blockToSend, 
blockToSend.length);
+               packet.setAddress(address);
+               packet.setPort(port);
+               
+               // TODO: keep?
+               // packet.length() is simply the size of the buffer, it knows 
nothing of UDP headers
+               IOStatisticCollector.addInfo(address + ":" + port, 0, 
blockToSend.length + UDP_HEADERS_LENGTH); 
+               
+               try {
+                       _sock.send(packet);
+               } catch (IOException e) {
+                       if(packet.getAddress() instanceof Inet6Address)
+                               Logger.normal(this, "Error while sending packet 
to IPv6 address: "+destination+": "+e, e);
+                       else
+                               Logger.error(this, "Error while sending packet 
to " + destination+": "+e, e);
+               }
+    }
+
+       // CompuServe use 1400 MTU; AOL claim 1450; DFN at home use 1448.
+       // http://info.aol.co.uk/broadband/faqHomeNetworking.adp
+       // 
http://www.compuserve.de/cso/hilfe/linux/hilfekategorien/installation/contentview.jsp?conid=385700
+       // http://www.studenten-ins-netz.net/inhalt/service_faq.html
+       // officially GRE is 1476 and PPPoE is 1492.
+       // unofficially, PPPoE is often 1472 (seen in the wild). Also PPPoATM 
is sometimes 1472.
+    static final int MAX_ALLOWED_MTU = 1400;
+    // FIXME this is different for IPv6 (check all uses of constant when 
fixing)
+    public static final int UDP_HEADERS_LENGTH = 28;
+    
+    /**
+     * @return The maximum packet size supported by this SocketManager, not 
including transport (UDP/IP) headers.
+     */
+    public int getMaxPacketSize() { //FIXME: what about passing a peerNode 
though and doing it on a per-peer basis?
+       final int minAdvertisedMTU = node.ipDetector.getMinimumDetectedMTU();
+       
+       // We don't want the MTU detection thingy to prevent us to send 
PacketTransmits!
+       if(minAdvertisedMTU < 1100){
+               Logger.error(this, "It shouldn't happen : we disabled the MTU 
detection algorithm because the advertised MTU is smallish !! 
("+node.ipDetector.getMinimumDetectedMTU()+')'); 
+               return MAX_ALLOWED_MTU - UDP_HEADERS_LENGTH;
+       } else
+               return (minAdvertisedMTU < MAX_ALLOWED_MTU ? minAdvertisedMTU : 
MAX_ALLOWED_MTU) - UDP_HEADERS_LENGTH;
+       // UDP/IP header is 28 bytes.
+    }
+
+       public void start() {
+               start(false);
+       }
+       
+       public void start(boolean disableHangChecker) {
+               lastTimeInSeconds = (int) (System.currentTimeMillis() / 1000);
+               setDaemon(true);
+               setPriority(Thread.MAX_PRIORITY);
+               super.start();
+               if(!disableHangChecker) {
+                       Thread checker = new Thread(new USMChecker(), 
"MessageCore$USMChecker");
+                       checker.setDaemon(true);
+                       checker.setPriority(Thread.MAX_PRIORITY);
+                       checker.start();
+               }
+       }
+       
+       public class USMChecker implements Runnable {
+               public void run() {
+                       while(true) {
+                               logMINOR = Logger.shouldLog(Logger.MINOR, 
UdpSocketHandler.this);
+                               try {
+                                       Thread.sleep(10*1000);
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                               if(UdpSocketHandler.this.isAlive()) {
+                                       if(logMINOR) Logger.minor(this, "PING 
on "+UdpSocketHandler.this);
+                                       long time = System.currentTimeMillis();
+                                       int timeSecs = (int) (time / 1000);
+                                       if(timeSecs - lastTimeInSeconds > 3*60) 
{
+                                               
+                                               // USM has hung.
+                                               // Probably caused by the 
EvilJVMBug (see PacketSender).
+                                               // We'd better restart... :(
+                                               
+                                               LoggingConfigHandler lch = 
Node.logConfigHandler;
+                                               FileLoggerHook flh = lch == 
null ? null : lch.getFileLoggerHook();
+                                               boolean hasRedirected = flh == 
null ? false : flh.hasRedirectedStdOutErrNoLock();
+                                               
+                                               if(!hasRedirected)
+                                                       
System.err.println("Restarting node: MessageCore froze for 3 minutes!");
+                                               
+                                               try {
+                                                       
if(node.isUsingWrapper()){
+                                                               
WrapperManager.requestThreadDump();
+                                                               
WrapperManager.restart();
+                                                       }else{
+                                                               
if(!hasRedirected)
+                                                                       
System.err.println("Exiting on deadlock, but not running in the wrapper! Please 
restart the node manually.");
+                                                               
+                                                               // No wrapper : 
we don't want to let it harm the network!
+                                                               node.exit("USM 
deadlock");
+                                                       }
+                                               } catch (Throwable t) {
+                                                       if(!hasRedirected) {
+                                                               
System.err.println("Error : can't restart the node : consider installing the 
wrapper. PLEASE REPORT THAT ERROR TO devl at freenetproject.org");
+                                                               
t.printStackTrace();
+                                                       }
+                                                       node.exit("USM deadlock 
and error");
+                                               }
+                                       }
+                               } else {
+                                       Logger.error(this, "MAIN LOOP 
TERMINATED");
+                                       System.err.println("MAIN LOOP 
TERMINATED!");
+                                       node.exit(Node.EXIT_MAIN_LOOP_LOST);
+                               }
+                       }
+               }
+       }
+
+    public void close(boolean exit) {
+       Logger.error(this, "Closing.", new Exception("error"));
+               synchronized (this) {
+                       while (!_isDone) {
+                               try {
+                                       wait(2000);
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+               if (exit) {
+                       _sock.close();
+               } else {
+                   Logger.fatal(this, 10, "Not implemented: close(false)");
+                       //Updater.saveResource(_sock);
+               }
+       }
+    
+       public int getDropProbability() {
+               return _dropProbability;
+       }
+       
+       public void setDropProbability(int dropProbability) {
+               _dropProbability = dropProbability;
+       }
+
+    public int getPortNumber() {
+        return _sock.getLocalPort();
+    }
+
+       public String toString() {
+               return _sock.getLocalAddress() + ":" + _sock.getLocalPort();
+       }
+
+       public int getHeadersLength() {
+               return UDP_HEADERS_LENGTH;
+       }
+
+}

Deleted: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2007-06-27 
21:00:20 UTC (rev 13784)
@@ -1,853 +0,0 @@
-/*
- * Dijjer - A Peer to Peer HTTP Cache
- * Copyright (C) 2004,2005 Change.Tv, Inc
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- */
-package freenet.io.comm;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-import org.tanukisoftware.wrapper.WrapperManager;
-
-import freenet.io.comm.Peer.LocalAddressException;
-import freenet.node.LoggingConfigHandler;
-import freenet.node.Node;
-import freenet.node.PeerNode;
-import freenet.support.FileLoggerHook;
-import freenet.support.Logger;
-import freenet.support.OOMHandler;
-import freenet.support.TimeUtil;
-
-public class UdpSocketManager extends Thread {
-
-       public static final String VERSION = "$Id: UdpSocketManager.java,v 1.22 
2005/08/25 17:28:19 amphibian Exp $";
-       private static boolean logMINOR; 
-       private Dispatcher _dispatcher;
-       private final DatagramSocket _sock;
-       /** _filters serves as lock for both */
-       private final LinkedList _filters = new LinkedList();
-       private final LinkedList _unclaimed = new LinkedList();
-       private int _dropProbability;
-       private IncomingPacketFilter lowLevelFilter;
-       /** RNG for debugging, used with _dropProbability.
-        * NOT CRYPTO SAFE. DO NOT USE FOR THINGS THAT NEED CRYPTO SAFE RNG!
-        */
-       private Random dropRandom;
-       private boolean _isDone;
-       private static UdpSocketManager _usm;
-       private static final int MAX_UNMATCHED_FIFO_SIZE = 50000;
-       private static final long MAX_UNCLAIMED_FIFO_ITEM_LIFETIME = 
60*60*1000;  // 1 hour
-       private volatile int lastTimeInSeconds;
-       private final InetAddress _bindTo;
-
-       // Icky layer violation, but we need to know the Node to work around 
the EvilJVMBug.
-       private final Node node;
-       
-       public void start() {
-               start(false);
-       }
-       
-       public void start(boolean disableHangChecker) {
-               lastTimeInSeconds = (int) (System.currentTimeMillis() / 1000);
-               setDaemon(true);
-               setPriority(Thread.MAX_PRIORITY);
-               super.start();
-               if(!disableHangChecker) {
-                       Thread checker = new Thread(new USMChecker(), 
"UdpSocketManager$USMChecker");
-                       checker.setDaemon(true);
-                       checker.setPriority(Thread.MAX_PRIORITY);
-                       checker.start();
-               }
-       }
-       
-       public class USMChecker implements Runnable {
-               public void run() {
-                       while(true) {
-                               logMINOR = Logger.shouldLog(Logger.MINOR, 
UdpSocketManager.this);
-                               try {
-                                       Thread.sleep(10*1000);
-                               } catch (InterruptedException e) {
-                                       // Ignore
-                               }
-                               if(UdpSocketManager.this.isAlive()) {
-                                       if(logMINOR) Logger.minor(this, "PING 
on "+UdpSocketManager.this);
-                                       long time = System.currentTimeMillis();
-                                       int timeSecs = (int) (time / 1000);
-                                       if(timeSecs - lastTimeInSeconds > 3*60) 
{
-                                               
-                                               // USM has hung.
-                                               // Probably caused by the 
EvilJVMBug (see PacketSender).
-                                               // We'd better restart... :(
-                                               
-                                               LoggingConfigHandler lch = 
Node.logConfigHandler;
-                                               FileLoggerHook flh = lch == 
null ? null : lch.getFileLoggerHook();
-                                               boolean hasRedirected = flh == 
null ? false : flh.hasRedirectedStdOutErrNoLock();
-                                               
-                                               if(!hasRedirected)
-                                                       
System.err.println("Restarting node: UdpSocketManager froze for 3 minutes!");
-                                               
-                                               try {
-                                                       
if(node.isUsingWrapper()){
-                                                               
WrapperManager.requestThreadDump();
-                                                               
WrapperManager.restart();
-                                                       }else{
-                                                               
if(!hasRedirected)
-                                                                       
System.err.println("Exiting on deadlock, but not running in the wrapper! Please 
restart the node manually.");
-                                                               
-                                                               // No wrapper : 
we don't want to let it harm the network!
-                                                               node.exit("USM 
deadlock");
-                                                       }
-                                               } catch (Throwable t) {
-                                                       if(!hasRedirected) {
-                                                               
System.err.println("Error : can't restart the node : consider installing the 
wrapper. PLEASE REPORT THAT ERROR TO devl at freenetproject.org");
-                                                               
t.printStackTrace();
-                                                       }
-                                                       node.exit("USM deadlock 
and error");
-                                               }
-                                       }
-                               } else {
-                                       Logger.error(this, "MAIN LOOP 
TERMINATED");
-                                       System.err.println("MAIN LOOP 
TERMINATED!");
-                                       node.exit(Node.EXIT_MAIN_LOOP_LOST);
-                               }
-                       }
-               }
-       }
-
-       public UdpSocketManager(int listenPort, InetAddress bindto, Node node) 
throws SocketException {
-               super("UdpSocketManager packet receiver thread on port " + 
listenPort);
-               this.node = node;
-               _bindTo = bindto;
-                   // Keep the Updater code in, just commented out, for now
-                   // We may want to be able to do on-line updates.
-//                     if (Updater.hasResource()) {
-//                             _sock = (DatagramSocket) Updater.getResource();
-//                     } else {
-               _sock = new DatagramSocket(listenPort, bindto);
-               int sz = _sock.getReceiveBufferSize();
-               if(sz < 32768)
-                       _sock.setReceiveBufferSize(32768);
-               try {
-                       // We make it timeout every 100ms so that we can check 
for
-                       // _filters which have timed out, this
-                       // is ugly but our only option without resorting to 
java.nio
-                       // because there is no way to forcefully
-                       // interrupt a socket wait operation
-                       _sock.setSoTimeout(100);
-               } catch (SocketException e) {
-                       throw new RuntimeException(e);
-               }
-//                     }
-               // Only used for debugging, no need to seed from Yarrow
-               dropRandom = new Random();
-               _timedOutFilters = new Vector(32);
-               logMINOR = Logger.shouldLog(Logger.MINOR, this);
-       }
-
-       public InetAddress getBindTo() {
-               return _bindTo;
-       }
-       
-       public void run() { // Listen for packets
-               try {
-                       runLoop();
-               } catch (Throwable t) {
-                       // Impossible? It keeps on exiting. We get the below,
-                       // but not this...
-                       try {
-                               System.err.print(t.getClass().getName());
-                               System.err.println();
-                       } catch (Throwable tt) {};
-                       try {
-                               System.err.print(t.getMessage());
-                               System.err.println();
-                       } catch (Throwable tt) {};
-                       try {
-                               System.gc();
-                               System.runFinalization();
-                               System.gc();
-                               System.runFinalization();
-                       } catch (Throwable tt) {}
-                       try {
-                               Runtime r = Runtime.getRuntime();
-                               System.err.print(r.freeMemory());
-                               System.err.println();
-                               System.err.print(r.totalMemory());
-                               System.err.println();
-                       } catch (Throwable tt) {};
-                       try {
-                               t.printStackTrace();
-                       } catch (Throwable tt) {};
-               } finally {
-                       System.err.println("run() exiting");
-                       Logger.error(this, "run() exiting");
-                       synchronized (this) {
-                               _isDone = true;
-                               notifyAll();
-                       }
-               }
-       }
-
-       private void runLoop() {
-               byte[] buf = new byte[MAX_RECEIVE_SIZE];
-               DatagramPacket packet = new DatagramPacket(buf, buf.length);
-               while (/*_active*/true) {
-                       try {
-                               lastTimeInSeconds = (int) 
(System.currentTimeMillis() / 1000);
-                               realRun(packet);
-            } catch (OutOfMemoryError e) {
-                               OOMHandler.handleOOM(e);
-                               System.err.println("Will retry above failed 
operation...");
-                       } catch (Throwable t) {
-                               System.err.println("Caught "+t);
-                               t.printStackTrace(System.err);
-                               Logger.error(this, "Caught " + t, t);
-                       }
-               }
-       }
-       
-       private void realRun(DatagramPacket packet) {
-               // Single receiving thread
-               boolean gotPacket = getPacket(packet);
-               // Check for timedout _filters
-               removeTimedOutFilters();
-               if (gotPacket) {
-                       long startTime = System.currentTimeMillis();
-                       Peer peer = new Peer(packet.getAddress(), 
packet.getPort());
-                       long endTime = System.currentTimeMillis();
-                       if(endTime - startTime > 50) {
-                               if(endTime-startTime > 3000)
-                                       Logger.error(this, "packet creation 
took "+(endTime-startTime)+"ms");
-                               else
-                                       if(logMINOR) Logger.minor(this, "packet 
creation took "+(endTime-startTime)+"ms");
-                       }
-                       byte[] data = packet.getData();
-                       int offset = packet.getOffset();
-                       int length = packet.getLength();
-                       try {
-                               if(logMINOR) Logger.minor(this, "Processing 
packet of length "+length+" from "+peer);
-                               startTime = System.currentTimeMillis();
-                               lowLevelFilter.process(data, offset, length, 
peer);
-                               endTime = System.currentTimeMillis();
-                               if(endTime - startTime > 50) {
-                                       if(endTime-startTime > 3000)
-                                               Logger.error(this, "processing 
packet took "+(endTime-startTime)+"ms");
-                                       else
-                                               if(logMINOR) Logger.minor(this, 
"processing packet took "+(endTime-startTime)+"ms");
-                               }
-                               if(logMINOR) Logger.minor(this,
-                                               "Successfully handled packet 
length " + length);
-                       } catch (Throwable t) {
-                               Logger.error(this, "Caught " + t + " from "
-                                               + lowLevelFilter, t);
-                       }
-               } else if(logMINOR) Logger.minor(this, "Null packet");
-       }
-       
-       /**
-        * Decode a packet from data and a peer.
-        * Can be called by IncomingPacketFilter's.
-     * @param data
-     * @param offset
-     * @param length
-     * @param peer
-     */
-    public Message decodeSingleMessage(byte[] data, int offset, int length, 
PeerContext peer, int overhead) {
-        try {
-            return Message.decodeMessageFromPacket(data, offset, length, peer, 
overhead);
-        } catch (Throwable t) {
-            Logger.error(this, "Could not decode packet: "+t, t);
-            return null;
-        }
-    }
-
-    // FIXME necessary to deal with bugs around build 1000; arguably necessary 
to deal with large node names in connection setup
-    // Revert to 1500?
-    private static final int MAX_RECEIVE_SIZE = 2048;
-    
-    private boolean getPacket(DatagramPacket packet) {
-               try {
-                       _sock.receive(packet);
-                       // TODO: keep?
-                       IOStatisticCollector.addInfo(packet.getAddress() + ":" 
+ packet.getPort(),
-                                       packet.getLength(), 0);
-               } catch (SocketTimeoutException e1) {
-                       return false;
-               } catch (IOException e2) {
-                       throw new RuntimeException(e2);
-               }
-               if(logMINOR) Logger.minor(this, "Received packet");
-               return true;
-       }
-
-    /** Only used by removeTimedOutFilters() - if future code uses this 
elsewhere, we need to
-     * reconsider its locking. */
-    private final Vector _timedOutFilters;
-    
-    /**
-     * Remove timed out filters.
-     * Only called by realRun(), so it can move timed out filters to the 
_timedOutFilters array,
-     * and then tell them that they are timed out without holding locks.
-     *
-     */
-       private void removeTimedOutFilters() {
-               long tStart = System.currentTimeMillis();
-               synchronized (_filters) {
-                       for (ListIterator i = _filters.listIterator(); 
i.hasNext();) {
-                               MessageFilter f = (MessageFilter) i.next();
-                               if (f.timedOut()) {
-                                       i.remove();
-                                       _timedOutFilters.add(f);
-                               } else { // Because _filters are in order of 
timeout, we
-                                       // can abort the iteration as soon as 
we find one that
-                                       // doesn't timeout
-                                       break;
-                               }
-                       }
-               }
-               
-               for(int i=0;i<_timedOutFilters.size();i++) {
-                       MessageFilter f = (MessageFilter) 
_timedOutFilters.get(i);
-                       f.setMessage(null);
-                       f.onTimedOut();
-               }
-               _timedOutFilters.clear();
-               
-               long tEnd = System.currentTimeMillis();
-               if(tEnd - tStart > 50) {
-                       if(tEnd - tStart > 3000)
-                               Logger.error(this, "removeTimedOutFilters took 
"+(tEnd-tStart)+"ms");
-                       else
-                               if(logMINOR) Logger.minor(this, 
"removeTimedOutFilters took "+(tEnd-tStart)+"ms");
-               }
-       }
-
-       /**
-        * Dispatch a message to a waiting filter, or feed it to the
-        * Dispatcher if none are found.
-        * @param m The Message to dispatch.
-        */
-       public void checkFilters(Message m) {
-               long tStart = System.currentTimeMillis();
-               if(logMINOR) Logger.minor(this, "checkFilters: "+m+" from 
"+m.getSource());
-               if ((m.getSource()) instanceof PeerNode)
-               {
-                       
((PeerNode)m.getSource()).addToLocalNodeReceivedMessagesFromStatistic(m);
-               }
-               boolean matched = false;
-               if ((!(m.getSpec().equals(DMT.packetTransmit))) && logMINOR) {
-                       if ((m.getSpec().equals(DMT.ping) || 
m.getSpec().equals(DMT.pong)) && Logger.shouldLog(Logger.DEBUG, this)) {
-                               Logger.debug(this, "" + 
(System.currentTimeMillis() % 60000) + ' ' + _sock.getLocalPort() + " <- "
-                                               + m.getSource() + " : " + m);
-                       } else {
-                               if(logMINOR) Logger.minor(this, "" + 
(System.currentTimeMillis() % 60000) + ' ' + _sock.getLocalPort() + " <- "
-                                               + m.getSource() + " : " + m);
-                       }
-               }
-               MessageFilter match = null;
-               synchronized (_filters) {
-                       for (ListIterator i = _filters.listIterator(); 
i.hasNext();) {
-                               MessageFilter f = (MessageFilter) i.next();
-                               if (f.match(m)) {
-                                       matched = true;
-                                       i.remove();
-                                       match = f;
-                                       if(logMINOR) Logger.minor(this, 
"Matched: "+f);
-                                       break; // Only one match permitted per 
message
-                               }
-                       }
-               }
-               if(match != null) {
-                       match.setMessage(m);
-                       match.onMatched();
-               }
-               // Feed unmatched messages to the dispatcher
-               if ((!matched) && (_dispatcher != null)) {
-                   try {
-                       if(logMINOR) Logger.minor(this, "Feeding to dispatcher: 
"+m);
-                       matched = _dispatcher.handleMessage(m);
-                   } catch (Throwable t) {
-                       Logger.error(this, "Dispatcher threw "+t, t);
-                   }
-               }
-               // Keep the last few _unclaimed messages around in case the 
intended receiver isn't receiving yet
-               if (!matched) {
-                       if(logMINOR) Logger.minor(this, "Unclaimed: "+m);
-                   /** Check filters and then add to _unmatched is ATOMIC
-                    * It has to be atomic, because otherwise we can get a
-                    * race condition that results in timeouts on MFs.
-                    * 
-                    * Specifically:
-                    * - Thread A receives packet
-                    * - Thread A checks filters. It doesn't match any.
-                    * - Thread A feeds to Dispatcher.
-                    * - Thread B creates filter.
-                    * - Thread B checks _unmatched.
-                    * - Thread B adds filter.
-                    * - Thread B sleeps.
-                    * - Thread A returns from Dispatcher. Which didn't match.
-                    * - Thread A adds to _unmatched.
-                    * 
-                    * OOPS!
-                    * The only way to fix this is to have checking the
-                    * filters and unmatched be a single atomic operation.
-                    * Another race is possible if we merely recheck the
-                    * filters after we return from dispatcher, for example.
-                    */
-                       synchronized (_filters) {
-                               if(logMINOR) Logger.minor(this, "Rechecking 
filters and adding message");
-                               for (ListIterator i = _filters.listIterator(); 
i.hasNext();) {
-                                       MessageFilter f = (MessageFilter) 
i.next();
-                                       if (f.match(m)) {
-                                               matched = true;
-                                               match = f;
-                                               i.remove();
-                                               if(logMINOR) Logger.minor(this, 
"Matched: "+f);
-                                               break; // Only one match 
permitted per message
-                                       }
-                               }
-                               if(!matched) {
-                                   while (_unclaimed.size() > 
MAX_UNMATCHED_FIFO_SIZE) {
-                                       Message removed = 
(Message)_unclaimed.removeFirst();
-                                       long messageLifeTime = 
System.currentTimeMillis() - removed.localInstantiationTime;
-                                       if ((removed.getSource()) instanceof 
PeerNode) {
-                                           Logger.normal(this, "Dropping 
unclaimed from "+removed.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (quantity)"+": "+removed);
-                                       } else {
-                                           Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" 
(quantity)"+": "+removed);
-                                       }
-                                   }
-                                   _unclaimed.addLast(m);
-                                   if(logMINOR) Logger.minor(this, "Done");
-                               }
-                       }
-                       if(match != null) {
-                               match.setMessage(m);
-                               match.onMatched();
-                       }
-               }
-               long tEnd = System.currentTimeMillis();
-               if(tEnd - tStart > 50) {
-                       if(tEnd - tStart > 3000)
-                               Logger.error(this, "checkFilters took 
"+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for 
matched: "+matched);
-                       else
-                               if(logMINOR) Logger.minor(this, "checkFilters 
took "+(tEnd-tStart)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" for 
matched: "+matched);
-               }
-       }
-       
-       /** IncomingPacketFilter should call this when a node is disconnected. 
*/
-       public void onDisconnect(PeerContext ctx) {
-               Vector droppedFilters = null; // rare operation, we can waste 
objects for better locking
-           synchronized(_filters) {
-                       ListIterator i = _filters.listIterator();
-                       while (i.hasNext()) {
-                           MessageFilter f = (MessageFilter) i.next();
-                           if(f.matchesDroppedConnection(ctx)) {
-                               if(droppedFilters == null)
-                                       droppedFilters = new Vector();
-                               droppedFilters.add(f);
-                               i.remove();
-                           }
-                       }
-           }
-           if(droppedFilters != null) {
-               for(int i=0;i<droppedFilters.size();i++) {
-                       MessageFilter mf = (MessageFilter) 
droppedFilters.get(i);
-                       mf.onDroppedConnection(ctx);
-               }
-           }
-       }
-       
-       /** IncomingPacketFilter should call this when a node connects with a 
new boot ID */
-       public void onRestart(PeerContext ctx) {
-               Vector droppedFilters = null; // rare operation, we can waste 
objects for better locking
-           synchronized(_filters) {
-                       ListIterator i = _filters.listIterator();
-                       while (i.hasNext()) {
-                           MessageFilter f = (MessageFilter) i.next();
-                           if(f.matchesRestartedConnection(ctx)) {
-                               if(droppedFilters == null)
-                                       droppedFilters = new Vector();
-                               droppedFilters.add(f);
-                               i.remove();
-                           }
-                       }
-           }
-           if(droppedFilters != null) {
-               for(int i=0;i<droppedFilters.size();i++) {
-                       MessageFilter mf = (MessageFilter) 
droppedFilters.get(i);
-                       mf.onRestartedConnection(ctx);
-               }
-           }
-       }
-
-       public void addAsyncFilter(MessageFilter filter, 
AsyncMessageFilterCallback callback) throws DisconnectedException {
-               filter.setAsyncCallback(callback);
-               filter.onStartWaiting();
-               boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
-               if(logDEBUG) Logger.debug(this, "Adding async filter "+filter+" 
for "+callback);
-               Message ret = null;
-               if((lowLevelFilter != null) && (filter._source != null) && 
-                       filter.matchesDroppedConnection(filter._source) &&
-                       lowLevelFilter.isDisconnected(filter._source))
-                   throw new DisconnectedException();
-               // Check to see whether the filter matches any of the recently 
_unclaimed messages
-               // Drop any _unclaimed messages that the filter doesn't match 
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
-               long now = System.currentTimeMillis();
-               long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
-               long messageLifeTime = 0;
-               synchronized (_filters) {
-                       if(logMINOR) Logger.minor(this, "Checking _unclaimed");
-                       for (ListIterator i = _unclaimed.listIterator(); 
i.hasNext();) {
-                               Message m = (Message) i.next();
-                               if (filter.match(m)) {
-                                       i.remove();
-                                       ret = m;
-                                       if(logMINOR) Logger.debug(this, 
"Matching from _unclaimed");
-                                       break;
-                               } else if (m.localInstantiationTime < 
messageDropTime) {
-                                       i.remove();
-                                       messageLifeTime = now - 
m.localInstantiationTime;
-                                       if ((m.getSource()) instanceof 
PeerNode) {
-                                               Logger.normal(this, "Dropping 
unclaimed from "+m.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
-                                       } else {
-                                               Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": 
"+m);
-                                       }
-                               }
-                       }
-                       if (ret == null) {
-                               if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
-                           // Insert filter into filter list in order of 
timeout
-                               ListIterator i = _filters.listIterator();
-                               while (true) {
-                                       if (!i.hasNext()) {
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added at end");
-                                               break;
-                                       }
-                                       MessageFilter mf = (MessageFilter) 
i.next();
-                                       if (mf.getTimeout() > 
filter.getTimeout()) {
-                                               i.previous();
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
-                                               break;
-                                       }
-                               }
-                       }
-               }
-               if(ret != null) {
-                       filter.onMatched();
-                       filter.clearMatched();
-               }
-       }
-
-       /**
-        * Wait for a filter to trigger, or timeout. Blocks until either the 
trigger is activated, or it times
-        * out, or the peer is disconnected.
-        * @param filter The filter to wait for.
-        * @param ctr Byte counter to add bytes from the message to.
-        * @return Either a message, or null if the filter timed out.
-        * @throws DisconnectedException If the single peer being waited for 
disconnects.
-        */
-       public Message waitFor(MessageFilter filter, ByteCounter ctr) throws 
DisconnectedException {
-               boolean logDEBUG = Logger.shouldLog(Logger.DEBUG, this);
-               if(logDEBUG) Logger.debug(this, "Waiting for "+filter);
-               long startTime = System.currentTimeMillis();
-               filter.onStartWaiting();
-               Message ret = null;
-               if((lowLevelFilter != null) && (filter._source != null) && 
-                       filter.matchesDroppedConnection(filter._source) &&
-                       lowLevelFilter.isDisconnected(filter._source))
-                   throw new DisconnectedException();
-               // Check to see whether the filter matches any of the recently 
_unclaimed messages
-               // Drop any _unclaimed messages that the filter doesn't match 
that are also older than MAX_UNCLAIMED_FIFO_ITEM_LIFETIME
-               long now = System.currentTimeMillis();
-               long messageDropTime = now - MAX_UNCLAIMED_FIFO_ITEM_LIFETIME;
-               long messageLifeTime = 0;
-               synchronized (_filters) {
-                       if(logMINOR) Logger.minor(this, "Checking _unclaimed");
-                       for (ListIterator i = _unclaimed.listIterator(); 
i.hasNext();) {
-                               Message m = (Message) i.next();
-                               if (filter.match(m)) {
-                                       i.remove();
-                                       ret = m;
-                                       if(logMINOR) Logger.debug(this, 
"Matching from _unclaimed");
-                                       break;
-                               } else if (m.localInstantiationTime < 
messageDropTime) {
-                                       i.remove();
-                                       messageLifeTime = now - 
m.localInstantiationTime;
-                                       if ((m.getSource()) instanceof 
PeerNode) {
-                                               Logger.normal(this, "Dropping 
unclaimed from "+m.getSource().getPeer()+", lived 
"+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": "+m);
-                                       } else {
-                                               Logger.normal(this, "Dropping 
unclaimed, lived "+TimeUtil.formatTime(messageLifeTime, 2, true)+" (age)"+": 
"+m);
-                                       }
-                               }
-                       }
-                       if (ret == null) {
-                               if(logMINOR) Logger.minor(this, "Not in 
_unclaimed");
-                           // Insert filter into filter list in order of 
timeout
-                               ListIterator i = _filters.listIterator();
-                               while (true) {
-                                       if (!i.hasNext()) {
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added at end");
-                                               break;
-                                       }
-                                       MessageFilter mf = (MessageFilter) 
i.next();
-                                       if (mf.getTimeout() > 
filter.getTimeout()) {
-                                               i.previous();
-                                               i.add(filter);
-                                               if(logMINOR) Logger.minor(this, 
"Added in middle - mf timeout="+mf.getTimeout()+" - my 
timeout="+filter.getTimeout());
-                                               break;
-                                       }
-                               }
-                       }
-               }
-               long tEnd = System.currentTimeMillis();
-               if(tEnd - now > 50) {
-                       if(tEnd - now > 3000)
-                               Logger.error(this, "waitFor _unclaimed 
iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of "+_unclaimed.size()+" 
for ret of "+ret);
-                       else
-                               if(logMINOR) Logger.minor(this, "waitFor 
_unclaimed iteration took "+(tEnd-now)+"ms with unclaimedFIFOSize of 
"+_unclaimed.size()+" for ret of "+ret);
-               }
-               // Unlock to wait on filter
-               // Waiting on the filter won't release the outer lock
-               // So we have to release it here
-               if(ret == null) {       
-                       if(logMINOR) Logger.minor(this, "Waiting...");
-                       synchronized (filter) {
-                               try {
-                                       // Precaution against filter getting 
matched between being added to _filters and
-                                       // here - bug discovered by Mason
-                                   boolean fmatched = false;
-                                   while(!(fmatched = (filter.matched() || 
(filter.droppedConnection() != null)))) {
-                                       long wait = 
filter.getTimeout()-System.currentTimeMillis();
-                                       if(wait > 0)
-                                           filter.wait(wait);
-                                       else break;
-                                       }
-                                   if(filter.droppedConnection() != null)
-                                       throw new DisconnectedException();
-                                   if(logMINOR) Logger.minor(this, "Matched: 
"+fmatched);
-                               } catch (InterruptedException e) {
-                               }
-                               ret = filter.getMessage();
-                               filter.clearMatched();
-                       }
-                       if(logDEBUG) Logger.debug(this, "Returning "+ret+" from 
"+filter);
-               } else {
-                       // Matched an unclaimed packet
-                       filter.onMatched();
-                       filter.clearMatched();
-               }
-               // Probably get rid...
-//             if (Dijjer.getDijjer().getDumpMessageWaitTimes() != null) {
-//                     
Dijjer.getDijjer().getDumpMessageWaitTimes().println(filter.toString() + "\t" + 
filter.getInitialTimeout() + "\t"
-//                                     + (System.currentTimeMillis() - 
startTime));
-//                     Dijjer.getDijjer().getDumpMessageWaitTimes().flush();
-//             }
-               long endTime = System.currentTimeMillis();
-               if(logDEBUG) Logger.debug(this, "Returning in 
"+(endTime-startTime)+"ms");
-               if((ctr != null) && (ret != null))
-                       ctr.receivedBytes(ret._receivedByteCount);
-               return ret;
-       }
-
-       /**
-        * Send a Message to a PeerContext.
-        * @throws NotConnectedException If we are not currently connected to 
the node.
-        */
-       public void send(PeerContext destination, Message m, ByteCounter ctr) 
throws NotConnectedException {
-           if(m.getSpec().isInternalOnly()) {
-               Logger.error(this, "Trying to send internal-only message "+m+" 
of spec "+m.getSpec(), new Exception("debug"));
-               return;
-           }
-               if ((m.getSpec().equals(DMT.ping) || 
m.getSpec().equals(DMT.pong)) && logMINOR) {
-                       if(Logger.shouldLog(Logger.DEBUG, this))
-                               Logger.debug(this, "" + 
(System.currentTimeMillis() % 60000) + ' ' + _sock.getPort() + " -> " + 
destination
-                                               + " : " + m);
-               } else {
-                       if(logMINOR) Logger.minor(this, "" + 
(System.currentTimeMillis() % 60000) + ' ' + _sock.getPort() + " -> " + 
destination
-                                       + " : " + m);
-               }
-//             byte[] blockToSend = m.encodeToPacket(lowLevelFilter, 
destination);
-//             if(lowLevelFilter != null) {
-//                     try {
-//                             lowLevelFilter.processOutgoing(blockToSend, 0, 
blockToSend.length, destination);
-//                             return;
-//                     } catch (IncomingPacketFilterException t) {
-//                             Logger.error(this, "Caught "+t+" sending "+m+" 
to "+destination, t);
-//                             destination.forceDisconnect();
-//                             throw new NotConnectedException("Error 
"+t.toString()+" forced disconnect");
-//                     }
-//             } else {
-//                 sendPacket(blockToSend, destination.getPeer());
-//             }
-               destination.sendAsync(m, null, 0, ctr);
-       }
-
-       /**
-        * Send a block of encoded bytes to a peer. This is called by
-        * send, and by IncomingPacketFilter.processOutgoing(..).
-     * @param blockToSend The data block to send.
-     * @param destination The peer to send it to.
-     */
-    public void sendPacket(byte[] blockToSend, Peer destination, boolean 
allowLocalAddresses) throws LocalAddressException {
-       assert(blockToSend != null);
-               // there should be no DNS needed here, but go ahead if we can, 
but complain doing it
-               if( destination.getAddress(false, allowLocalAddresses) == null 
) {
-                       Logger.error(this, "Tried sending to destination 
without pre-looked up IP address(needs a real Peer.getHostname()): null:" + 
destination.getPort(), new Exception("error"));
-                       if( destination.getAddress(true, allowLocalAddresses) 
== null ) {
-                               Logger.error(this, "Tried sending to bad 
destination address: null:" + destination.getPort(), new Exception("error"));
-                               return;
-                       }
-               }
-               if (_dropProbability > 0) {
-                       if (dropRandom.nextInt() % _dropProbability == 0) {
-                               if(logMINOR) Logger.minor(this, "DROPPED: " + 
_sock.getLocalPort() + " -> " + destination.getPort());
-                               return;
-                       }
-               }
-               InetAddress address = destination.getAddress(false, 
allowLocalAddresses);
-               assert(address != null);
-               int port = destination.getPort();
-               DatagramPacket packet = new DatagramPacket(blockToSend, 
blockToSend.length);
-               packet.setAddress(address);
-               packet.setPort(port);
-               
-               // TODO: keep?
-               // packet.length() is simply the size of the buffer, it knows 
nothing of UDP headers
-               IOStatisticCollector.addInfo(address + ":" + port, 0, 
blockToSend.length + UDP_HEADERS_LENGTH); 
-               
-               try {
-                       _sock.send(packet);
-               } catch (IOException e) {
-                       if(packet.getAddress() instanceof Inet6Address)
-                               Logger.normal(this, "Error while sending packet 
to IPv6 address: "+destination+": "+e, e);
-                       else
-                               Logger.error(this, "Error while sending packet 
to " + destination+": "+e, e);
-               }
-    }
-
-    public void close(boolean exit) {
-       Logger.error(this, "Closing.", new Exception("error"));
-               synchronized (this) {
-                       while (!_isDone) {
-                               try {
-                                       wait(2000);
-                               } catch (InterruptedException e) {
-                                       e.printStackTrace();
-                               }
-                       }
-               }
-               if (exit) {
-                       _sock.close();
-               } else {
-                   Logger.fatal(this, 10, "Not implemented: close(false)");
-                       //Updater.saveResource(_sock);
-               }
-       }
-
-       public void setDispatcher(Dispatcher d) {
-               _dispatcher = d;
-       }
-
-       /** Must be called, or we will NPE */
-       public void setLowLevelFilter(IncomingPacketFilter f) {
-           lowLevelFilter = f;
-       }
-       
-       public String toString() {
-               return _sock.getLocalAddress() + ":" + _sock.getLocalPort();
-       }
-
-       public void setDropProbability(int dropProbability) {
-               _dropProbability = dropProbability;
-       }
-
-       public static UdpSocketManager getUdpSocketManager()
-       {
-               return _usm;
-       }
-
-//     public static void init(int externalListenPort, InetAddress bindto)
-//             throws SocketException
-//     {
-//             _usm = new UdpSocketManager(externalListenPort, bindto);
-//     }
-//
-    public int getPortNumber() {
-        return _sock.getLocalPort();
-    }
-
-    
-       // CompuServe use 1400 MTU; AOL claim 1450; DFN at home use 1448.
-       // http://info.aol.co.uk/broadband/faqHomeNetworking.adp
-       // 
http://www.compuserve.de/cso/hilfe/linux/hilfekategorien/installation/contentview.jsp?conid=385700
-       // http://www.studenten-ins-netz.net/inhalt/service_faq.html
-       // officially GRE is 1476 and PPPoE is 1492.
-       // unofficially, PPPoE is often 1472 (seen in the wild). Also PPPoATM 
is sometimes 1472.
-    static final int MAX_ALLOWED_MTU = 1400;
-    // FIXME this is different for IPv6 (check all uses of constant when 
fixing)
-    public static final int UDP_HEADERS_LENGTH = 28;
-    
-    /**
-     * @return The maximum packet size supported by this SocketManager, not 
including transport (UDP/IP) headers.
-     */
-    public int getMaxPacketSize() { //FIXME: what about passing a peerNode 
though and doing it on a per-peer basis?
-       final int minAdvertisedMTU = node.ipDetector.getMinimumDetectedMTU();
-       
-       // We don't want the MTU detection thingy to prevent us to send 
PacketTransmits!
-       if(minAdvertisedMTU < 1100){
-               Logger.error(this, "It shouldn't happen : we disabled the MTU 
detection algorithm because the advertised MTU is smallish !! 
("+node.ipDetector.getMinimumDetectedMTU()+')'); 
-               return MAX_ALLOWED_MTU - UDP_HEADERS_LENGTH;
-       } else
-               return (minAdvertisedMTU < MAX_ALLOWED_MTU ? minAdvertisedMTU : 
MAX_ALLOWED_MTU) - UDP_HEADERS_LENGTH;
-       // UDP/IP header is 28 bytes.
-    }
-
-       /**
-        * @return the number of received messages that are currently unclaimed
-        */
-       public int getUnclaimedFIFOSize() {
-               synchronized (_filters){
-                       return _unclaimed.size();
-               }
-       }
-       
-       public Map getUnclaimedFIFOMessageCounts() {
-               Map messageCounts = new HashMap();
-               synchronized(_filters) {
-                       for (ListIterator i = _unclaimed.listIterator(); 
i.hasNext();) {
-                               Message m = (Message) i.next();
-                               String messageName = m.getSpec().getName();
-                               Integer messageCount = (Integer) 
messageCounts.get(messageName);
-                               if (messageCount == null) {
-                                       messageCounts.put(messageName, new 
Integer(1) );
-                               } else {
-                                       messageCount = new 
Integer(messageCount.intValue() + 1);
-                                       messageCounts.put(messageName, 
messageCount );
-                               }
-                       }
-               }
-               return messageCounts;
-       }
-
-       public int getDropProbability() {
-               return _dropProbability;
-       }
-}

Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2007-06-27 
21:00:20 UTC (rev 13784)
@@ -29,7 +29,7 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.RetrievalException;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.MessageCore;
 import freenet.support.BitArray;
 import freenet.support.Buffer;
 import freenet.support.Logger;
@@ -47,12 +47,12 @@
        PartiallyReceivedBlock _prb;
        PeerContext _sender;
        long _uid;
-       UdpSocketManager _usm;
+       MessageCore _usm;
        /** packet : Integer -> reportTime : Long * */
        HashMap _recentlyReportedMissingPackets = new HashMap();
        ByteCounter _ctr;

-       public BlockReceiver(UdpSocketManager usm, PeerContext sender, long 
uid, PartiallyReceivedBlock prb, ByteCounter ctr) {
+       public BlockReceiver(MessageCore usm, PeerContext sender, long uid, 
PartiallyReceivedBlock prb, ByteCounter ctr) {
                _sender = sender;
                _prb = prb;
                _uid = uid;

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2007-06-27 
21:00:20 UTC (rev 13784)
@@ -26,11 +26,10 @@
 import freenet.io.comm.DMT;
 import freenet.io.comm.DisconnectedException;
 import freenet.io.comm.Message;
+import freenet.io.comm.MessageCore;
 import freenet.io.comm.MessageFilter;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
-import freenet.io.comm.UdpSocketManager;
-import freenet.node.FNPPacketMangler;
 import freenet.node.PeerNode;
 import freenet.support.BitArray;
 import freenet.support.DoubleTokenBucket;
@@ -46,7 +45,7 @@
        public static final int SEND_TIMEOUT = 60000;
        public static final int PING_EVERY = 8;

-       UdpSocketManager _usm;
+       MessageCore _usm;
        PeerContext _destination;
        boolean _sendComplete;
        long _uid;
@@ -61,7 +60,7 @@
        final ByteCounter _ctr;
        final int PACKET_SIZE;

-       public BlockTransmitter(UdpSocketManager usm, PeerContext destination, 
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle, 
ByteCounter ctr) {
+       public BlockTransmitter(MessageCore usm, PeerContext destination, long 
uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle, 
ByteCounter ctr) {
                _usm = usm;
                _destination = destination;
                _uid = uid;
@@ -69,7 +68,7 @@
                _ctr = ctr;
                _masterThrottle = masterThrottle;
                PACKET_SIZE = DMT.packetTransmitSize(_prb._packetSize, 
_prb._packets)
-                       + FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE;
+                       + 
destination.getOutgoingMangler().fullHeadersLengthOneMessage();
                try {
                        _sentPackets = new BitArray(_prb.getNumPackets());
                } catch (AbortedException e) {
@@ -232,7 +231,7 @@
                                }
                                if(logMINOR) Logger.minor(this, "Got "+msg);
                                if(!_destination.isConnected()) {
-                                       Logger.normal(this, "Terminating send 
"+_uid+" to "+_destination+" from "+_usm.getPortNumber()+" because node 
disconnected while waiting");
+                                       Logger.normal(this, "Terminating send 
"+_uid+" to "+_destination+" from "+_destination.getSocketHandler()+" because 
node disconnected while waiting");
                                        synchronized(_senderThread) {
                                                _sendComplete = true;
                                                _senderThread.notifyAll();
@@ -249,7 +248,7 @@
                                                        _sendComplete = true;
                                                        
_senderThread.notifyAll();
                                                }
-                                               Logger.error(this, "Terminating 
send "+_uid+" to "+_destination+" from "+_usm.getPortNumber()+" as we haven't 
heard from receiver in "+TimeUtil.formatTime((now - timeAllSent), 2, true)+ 
'.');
+                                               Logger.error(this, "Terminating 
send "+_uid+" to "+_destination+" from "+_destination.getSocketHandler()+" as 
we haven't heard from receiver in "+TimeUtil.formatTime((now - timeAllSent), 2, 
true)+ '.');
                                                return false;
                                        } else {
                                                if(logMINOR) Logger.minor(this, 
"Ignoring timeout: timeAllSent="+timeAllSent+" ("+(System.currentTimeMillis() - 
timeAllSent)+"), getNumSent="+getNumSent()+ '/' +_prb.getNumPackets());

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-06-27 
21:00:20 UTC (rev 13784)
@@ -41,6 +41,7 @@
        final DoubleTokenBucket masterThrottle;
        private boolean sentCancel;
        private boolean finished;
+       final int packetSize;

        public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, 
long uid, DoubleTokenBucket masterThrottle) throws DisconnectedException {
                this.prb = prb;
@@ -88,6 +89,8 @@
                        cancel();
                        throw e;
                }
+               packetSize = DMT.bulkPacketTransmitSize(prb.blockSize) +
+                       peer.getOutgoingMangler().fullHeadersLengthOneMessage();
        }

        /**
@@ -146,7 +149,6 @@
         * @return True if the file was successfully sent. False otherwise.
         */
        public boolean send() {
-               int packetSize = prb.getPacketSize();
                long lastSentPacket = System.currentTimeMillis();
                while(true) {
                        if(prb.isAborted()) return false;

Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-06-27 17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-06-27 21:00:20 UTC (rev 13784)
@@ -5,10 +5,8 @@

 import java.io.IOException;

-import freenet.io.comm.DMT;
+import freenet.io.comm.MessageCore;
 import freenet.io.comm.RetrievalException;
-import freenet.io.comm.UdpSocketManager;
-import freenet.node.FNPPacketMangler;
 import freenet.support.BitArray;
 import freenet.support.Logger;
 import freenet.support.io.RandomAccessThing;
@@ -31,11 +29,10 @@
        private final BitArray blocksReceived;
        final int blocks;
        private BulkTransmitter[] transmitters;
-       final UdpSocketManager usm;
+       final MessageCore usm;
        /** The one and only BulkReceiver */
        private BulkReceiver recv;
        private int blocksReceivedCount;
-       final int packetSize;
        // Abort status
        boolean _aborted;
        int _abortReason;
@@ -49,7 +46,7 @@
         * @param initialState If true, assume all blocks have been received. 
If false, assume no blocks have
         * been received.
         */
-       public PartiallyReceivedBulk(UdpSocketManager usm, long size, int 
blockSize, RandomAccessThing raf, boolean initialState) {
+       public PartiallyReceivedBulk(MessageCore usm, long size, int blockSize, 
RandomAccessThing raf, boolean initialState) {
                this.size = size;
                this.blockSize = blockSize;
                this.raf = raf;
@@ -63,8 +60,6 @@
                        blocksReceived.setAllOnes();
                        blocksReceivedCount = this.blocks;
                }
-               packetSize = DMT.bulkPacketTransmitSize(blockSize) + 
-                       FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE; // 
FIXME generalise
        }

        /**
@@ -153,10 +148,6 @@
                return _aborted;
        }

-       public int getPacketSize() {
-               return packetSize;
-       }
-
        public boolean hasWholeFile() {
                return blocksReceivedCount >= blocks;
        }

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2007-06-27 
21:00:20 UTC (rev 13784)
@@ -38,7 +38,8 @@
        private static boolean logMINOR;
     final Node node;
     final PeerManager pm;
-    final UdpSocketManager usm;
+    final MessageCore usm;
+    final PacketSocketHandler sock;
     final EntropySource fnpTimingSource;
     final EntropySource myPacketDataSource;
     private static final int MAX_PACKETS_IN_FLIGHT = 256; 
@@ -64,17 +65,18 @@
        static public final int HEADERS_LENGTH_ONE_MESSAGE = 
                HEADERS_LENGTH_MINIMUM + 2; // 2 bytes = length of message. 
rest is the same.

-       static public final int FULL_HEADERS_LENGTH_MINIMUM = 
-               HEADERS_LENGTH_MINIMUM + UdpSocketManager.UDP_HEADERS_LENGTH;
-       static public final int FULL_HEADERS_LENGTH_ONE_MESSAGE =
-               HEADERS_LENGTH_ONE_MESSAGE + 
UdpSocketManager.UDP_HEADERS_LENGTH;
-    
-    public FNPPacketMangler(Node node) {
+       final int fullHeadersLengthMinimum;
+       final int fullHeadersLengthOneMessage;
+       
+    public FNPPacketMangler(Node node, PacketSocketHandler sock) {
         this.node = node;
         this.pm = node.peers;
         this.usm = node.usm;
+        this.sock = sock;
         fnpTimingSource = new EntropySource();
         myPacketDataSource = new EntropySource();
+        fullHeadersLengthMinimum = HEADERS_LENGTH_MINIMUM + 
sock.getHeadersLength();
+        fullHeadersLengthOneMessage = HEADERS_LENGTH_ONE_MESSAGE + 
sock.getHeadersLength();
     }

     /**
@@ -432,7 +434,7 @@
      */
     private void sendAuthPacket(byte[] output, PeerNode pn, Peer replyTo) {
         int length = output.length;
-        if(length > node.usm.getMaxPacketSize()) {
+        if(length > sock.getMaxPacketSize()) {
             throw new IllegalStateException("Cannot send auth packet: too 
long: "+length);
         }
         BlockCipher cipher = pn.outgoingSetupCipher;
@@ -469,7 +471,7 @@
                Peer p = pn.getPeer();
                if(p != null) replyTo = p;
        }
-       usm.sendPacket(data, replyTo, pn.allowLocalAddresses());
+       sock.sendPacket(data, replyTo, pn.allowLocalAddresses());
        pn.reportOutgoingBytes(data.length);
        node.outputThrottle.forceGrab(data.length - alreadyReportedBytes);
        }
@@ -889,7 +891,7 @@
             ptr+=length;
             if(m != null) {
                 //Logger.minor(this, "Dispatching packet: "+m);
-                usm.checkFilters(m);
+                usm.checkFilters(m, sock);
             }
         }
         if(logMINOR) Logger.minor(this, "Done");
@@ -932,7 +934,7 @@
                     }
                     int packetNumber = 
kt.allocateOutgoingPacketNumberNeverBlock();
                     this.processOutgoingPreformatted(buf, 0, buf.length, 
pn.getCurrentKeyTracker(), packetNumber, mi.cb, mi.alreadyReportedBytes);
-                    mi.onSent(buf.length + FULL_HEADERS_LENGTH_ONE_MESSAGE);
+                    mi.onSent(buf.length + fullHeadersLengthOneMessage);
                 } catch (NotConnectedException e) {
                     Logger.normal(this, "Caught "+e+" while sending messages 
("+mi_name+") to "+pn.getPeer()+requeueLogString);
                     // Requeue
@@ -969,7 +971,7 @@
             } else {
                 byte[] data = mi.getData(pn);
                 messageData[x] = data;
-                if(data.length > node.usm.getMaxPacketSize()) {
+                if(data.length > sock.getMaxPacketSize()) {
                     Logger.error(this, "Message exceeds packet size: 
"+messages[i]+" size "+data.length+" message "+mi.msg);
                     // Will be handled later
                 }
@@ -1003,7 +1005,7 @@
         }
         if(x != callbacksCount) throw new IllegalStateException();

-        if((length + HEADERS_LENGTH_MINIMUM < node.usm.getMaxPacketSize()) &&
+        if((length + HEADERS_LENGTH_MINIMUM < sock.getMaxPacketSize()) &&
                 (messageData.length < 256)) {
                        mi_name = null;
             try {
@@ -1011,7 +1013,7 @@
                 for(int i=0;i<messageData.length;i++) {
                        MessageItem mi = newMsgs[i];
                                        mi_name = (mi.msg == null ? "(not a 
Message)" : mi.msg.getSpec().getName());
-                                       mi.onSent(messageData[i].length + 2 + 
(FULL_HEADERS_LENGTH_MINIMUM / messageData.length));
+                                       mi.onSent(messageData[i].length + 2 + 
(fullHeadersLengthMinimum / messageData.length));
                 }
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Caught "+e+" while sending messages 
("+mi_name+") to "+pn.getPeer()+requeueLogString);
@@ -1048,7 +1050,7 @@
                 else thisLength = (messageData[i].length + 2);
                 int newLength = length + thisLength;
                 count++;
-                if((newLength + HEADERS_LENGTH_MINIMUM > 
node.usm.getMaxPacketSize()) || (count > 255) || (i == messages.length)) {
+                if((newLength + HEADERS_LENGTH_MINIMUM > 
sock.getMaxPacketSize()) || (count > 255) || (i == messages.length)) {
                     // lastIndex up to the message right before this one
                     // e.g. lastIndex = 0, i = 1, we just send message 0
                     if(lastIndex != i) {
@@ -1058,7 +1060,7 @@
                             for(int j=lastIndex;j<i;j++) {
                                MessageItem mi = newMsgs[j];
                                                                mi_name = 
(mi.msg == null ? "(not a Message)" : mi.msg.getSpec().getName());
-                                                               
mi.onSent(messageData[j].length + 2 + (FULL_HEADERS_LENGTH_MINIMUM / 
(i-lastIndex)));
+                                                               
mi.onSent(messageData[j].length + 2 + (fullHeadersLengthMinimum / 
(i-lastIndex)));
                             }
                         } catch (NotConnectedException e) {
                             Logger.normal(this, "Caught "+e+" while sending 
messages ("+mi_name+") to "+pn.getPeer()+requeueLogString);
@@ -1548,4 +1550,12 @@
        public int[] supportedNegTypes() {
                return new int[] { 1 };
        }
+
+       public int fullHeadersLengthOneMessage() {
+               return fullHeadersLengthOneMessage;
+       }
+
+       public SocketHandler getSocketHandler() {
+               return sock;
+       }
 }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2007-06-27 17:55:09 UTC (rev 
13783)
+++ trunk/freenet/src/freenet/node/Node.java    2007-06-27 21:00:20 UTC (rev 
13784)
@@ -62,11 +62,12 @@
 import freenet.io.comm.DisconnectedException;
 import freenet.io.comm.FreenetInetAddress;
 import freenet.io.comm.Message;
+import freenet.io.comm.MessageCore;
 import freenet.io.comm.MessageFilter;
 import freenet.io.comm.Peer;
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.ReferenceSignatureVerificationException;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.UdpSocketHandler;
 import freenet.io.xfer.PartiallyReceivedBlock;
 import freenet.keys.CHKBlock;
 import freenet.keys.CHKVerifyException;
@@ -369,7 +370,10 @@
        public final RandomSource random;
        /** Weak but fast RNG */
        public final Random fastWeakRandom;
-       final UdpSocketManager usm;
+       /** The object which handles incoming messages and allows us to wait 
for them */
+       final MessageCore usm;
+       /** The object which handles our specific UDP port, pulls messages from 
it, feeds them to the packet mangler for decryption etc */
+       final UdpSocketHandler sock;
        final FNPPacketMangler packetMangler;
        final DNSRequester dnsr;
        public final PacketSender ps;
@@ -791,8 +795,10 @@
                        port=-1;
                }

-               UdpSocketManager u = null;
+               usm = new MessageCore();

+               UdpSocketHandler u = null;
+               
                if(port > 65535) {
                        throw new NodeInitException(EXIT_IMPOSSIBLE_USM_PORT, 
"Impossible port number: "+port);
                } else if(port == -1) {
@@ -800,7 +806,7 @@
                        for(int i=0;i<200000;i++) {
                                int portNo = 1024 + random.nextInt(65535-1024);
                                try {
-                                       u = new UdpSocketManager(portNo, 
InetAddress.getByName(bindto), this);
+                                       u = new UdpSocketHandler(portNo, 
InetAddress.getByName(bindto), this);
                                        port = u.getPortNumber();
                                        break;
                                } catch (Exception e) {
@@ -814,12 +820,12 @@
                                throw new 
NodeInitException(EXIT_NO_AVAILABLE_UDP_PORTS, "Could not find an available UDP 
port number for FNP (none specified)");
                } else {
                        try {
-                               u = new UdpSocketManager(port, 
InetAddress.getByName(bindto), this);
+                               u = new UdpSocketHandler(port, 
InetAddress.getByName(bindto), this);
                        } catch (Exception e) {
                                throw new 
NodeInitException(EXIT_IMPOSSIBLE_USM_PORT, "Could not bind to port: "+port+" 
(node already running?)");
                        }
                }
-               usm = u;
+               sock = u;

                Logger.normal(this, "FNP port created on "+bindto+ ':' +port);
                System.out.println("FNP port created on "+bindto+ ':' +port);
@@ -829,17 +835,17 @@
                                new IntCallback() {

                                        public int get() {
-                                               return usm.getDropProbability();
+                                               return 
((UdpSocketHandler)sock).getDropProbability();
                                        }

                                        public void set(int val) throws 
InvalidConfigValueException {
-                                               usm.setDropProbability(val);
+                                               
((UdpSocketHandler)sock).setDropProbability(val);
                                        }

                });

                int dropProb = nodeConfig.getInt("testingDropPacketsEvery");
-               usm.setDropProbability(dropProb);
+               ((UdpSocketHandler)sock).setDropProbability(dropProb);

                Logger.normal(Node.class, "Creating node...");

@@ -995,7 +1001,7 @@
                peers.updatePMUserAlert();

                usm.setDispatcher(dispatcher=new NodeDispatcher(this));
-               usm.setLowLevelFilter(packetMangler = new 
FNPPacketMangler(this));
+               sock.setLowLevelFilter(packetMangler = new 
FNPPacketMangler(this, sock));

                // Extra Peer Data Directory
                nodeConfig.register("extraPeerDataDir", new File(nodeDir, 
"extra-peer-data-"+portNumber).toString(), sortOrder++, true, false, 
"Node.extraPeerDir", "Node.extraPeerDirLong",
@@ -1439,8 +1445,10 @@
                ps.start(nodeStats);
                peers.start(); // must be before usm
                nodeStats.start();
-               usm.start(disableHangCheckers);

+               usm.start(ps);
+               ((UdpSocketHandler)sock).start(disableHangCheckers);
+               
                if(isUsingWrapper()) {
                        Logger.normal(this, "Using wrapper correctly: 
"+nodeStarter);
                        System.out.println("Using wrapper correctly: 
"+nodeStarter);
@@ -2663,7 +2671,7 @@
          return myName;
        }

-       public UdpSocketManager getUSM() {
+       public MessageCore getUSM() {
          return usm;
        }


Modified: trunk/freenet/src/freenet/node/NodeIPDetector.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeIPDetector.java  2007-06-27 17:55:09 UTC 
(rev 13783)
+++ trunk/freenet/src/freenet/node/NodeIPDetector.java  2007-06-27 21:00:20 UTC 
(rev 13784)
@@ -11,7 +11,7 @@
 import freenet.config.SubConfig;
 import freenet.io.comm.FreenetInetAddress;
 import freenet.io.comm.Peer;
-import freenet.io.comm.UdpSocketManager;
+import freenet.io.comm.MessageCore;
 import freenet.l10n.L10n;
 import freenet.node.useralerts.IPUndetectedUserAlert;
 import freenet.node.useralerts.SimpleUserAlert;
@@ -86,9 +86,9 @@
                                addedValidIP = true;
                }
                boolean dontDetect = false;
-               UdpSocketManager usm = node.usm;
+               MessageCore usm = node.usm;
                if(usm != null) {
-                       InetAddress addr = node.usm.getBindTo();
+                       InetAddress addr = node.sock.getBindTo();
                        if(addr != null && (IPUtil.isValidAddress(addr, 
false))) {
                                dontDetect = true;
                                Peer p = new Peer(addr, node.portNumber);

Modified: trunk/freenet/src/freenet/node/NodeStats.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeStats.java       2007-06-27 17:55:09 UTC 
(rev 13783)
+++ trunk/freenet/src/freenet/node/NodeStats.java       2007-06-27 21:00:20 UTC 
(rev 13784)
@@ -302,6 +302,9 @@
                        new TokenBucket(Math.max(obwLimit*60, 32768*20), 
(int)((1000L*1000L*1000L) / (obwLimit * 
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
                requestInputThrottle = 
                        new TokenBucket(Math.max(ibwLimit*60, 32768*20), 
(int)((1000L*1000L*1000L) / (ibwLimit * 
FRACTION_OF_BANDWIDTH_USED_BY_REQUESTS)), 0);
+               
+               estimatedSizeOfOneThrottledPacket = 1024 + 
DMT.packetTransmitSize(1024, 32) + 
+                       node.packetMangler.fullHeadersLengthOneMessage();
        }

        protected String l10n(String key) {
@@ -321,9 +324,7 @@

        private long lastAcceptedRequest = -1;

-       static final int ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET = 
-               1024 + DMT.packetTransmitSize(1024, 32)
-               + FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE;
+       final int estimatedSizeOfOneThrottledPacket;

        final Runnable throttledPacketSendAverageIdleUpdater =
                new Runnable() {
@@ -332,8 +333,8 @@
                                try {
                                        
if(throttledPacketSendAverage.lastReportTime() < now - 5000) {  // if last 
report more than 5 seconds ago
                                                // shouldn't take long
-                                               
node.outputThrottle.blockingGrab(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
-                                               
node.outputThrottle.recycle(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
+                                               
node.outputThrottle.blockingGrab(estimatedSizeOfOneThrottledPacket);
+                                               
node.outputThrottle.recycle(estimatedSizeOfOneThrottledPacket);
                                                long after = 
System.currentTimeMillis();
                                                // Report time it takes to grab 
the bytes.
                                                
throttledPacketSendAverage.report(after - now);

Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2007-06-27 
17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2007-06-27 
21:00:20 UTC (rev 13784)
@@ -6,6 +6,7 @@
 import freenet.io.comm.AsyncMessageCallback;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
+import freenet.io.comm.SocketHandler;
 import freenet.support.WouldBlockException;

 /**
@@ -78,5 +79,15 @@
         * List of supported negotiation types in preference order (best last)
         */
        public int[] supportedNegTypes();
+       
+       /**
+        * Size of the packet headers, in bytes, assuming only one message in 
this packet.
+        */
+       public int fullHeadersLengthOneMessage();

+       /**
+        * The SocketHandler we are connected to.
+        */
+       public SocketHandler getSocketHandler();
+
 }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2007-06-27 17:55:09 UTC 
(rev 13783)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2007-06-27 21:00:20 UTC 
(rev 13784)
@@ -55,6 +55,7 @@
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.ReferenceSignatureVerificationException;
 import freenet.io.comm.RetrievalException;
+import freenet.io.comm.SocketHandler;
 import freenet.io.xfer.BulkReceiver;
 import freenet.io.xfer.BulkTransmitter;
 import freenet.io.xfer.PacketThrottle;
@@ -113,6 +114,9 @@
     /** My low-level address for SocketManager purposes */
     private Peer detectedPeer;

+    /** My OutgoingPacketMangler i.e. the object which encrypts packets sent 
to this node */
+    private OutgoingPacketMangler outgoingMangler;
+    
     /** Advertised addresses */
     private Vector nominalPeer;

@@ -3954,4 +3958,12 @@
                        // Ignore
                }
        }
+
+       public OutgoingPacketMangler getOutgoingMangler() {
+               return outgoingMangler;
+       }
+
+       public SocketHandler getSocketHandler() {
+               return outgoingMangler.getSocketHandler();
+       }
 }

Modified: 
trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java     
2007-06-27 17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/simulator/RealNodeRequestInsertTest.java     
2007-06-27 21:00:20 UTC (rev 13784)
@@ -41,7 +41,7 @@
         new File(wd).mkdir();
         NodeStarter.globalTestInit(wd); // ignore Random, using our own
         // Don't clobber nearby nodes!
-        Logger.setupStdoutLogging(Logger.DEBUG, 
"freenet.store:minor,freenet.node.Location:normal" 
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.UdpSocketManager:debug"*/);
+        Logger.setupStdoutLogging(Logger.DEBUG, 
"freenet.store:minor,freenet.node.Location:normal" 
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.MessageCore:debug"*/);
         Logger.globalSetThreshold(Logger.DEBUG);
         System.out.println("Insert/retrieve test");
         System.out.println();

Modified: trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java
===================================================================
--- trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java   
2007-06-27 17:55:09 UTC (rev 13783)
+++ trunk/freenet/src/freenet/node/simulator/RealNodeRoutingTest.java   
2007-06-27 21:00:20 UTC (rev 13784)
@@ -33,7 +33,7 @@
     static final short MAX_HTL = (short)6;

     public static void main(String[] args) throws FSParseException, 
PeerParseException, InvalidThresholdException, NodeInitException, 
ReferenceSignatureVerificationException {
-        Logger.setupStdoutLogging(Logger.NORMAL, 
"freenet.node.CPUAdjustingSwapRequestInterval:minor" 
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.UdpSocketManager:debug"*/);
+        Logger.setupStdoutLogging(Logger.NORMAL, 
"freenet.node.CPUAdjustingSwapRequestInterval:minor" 
/*"freenet.node.LocationManager:debug,freenet.node.FNPPacketManager:normal,freenet.io.comm.MessageCore:debug"*/);
         System.out.println("Routing test using real nodes:");
         System.out.println();
         String wd = "realNodeRequestInsertTest";


Reply via email to