Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv11501/src/freenet

Modified Files:
        ConnectionHandler.java ConnectionHandlerComparator.java 
        Core.java MessageSendCallback.java OpenConnectionManager.java 
        PeerHandler.java PeerPacket.java PeerPacketMessage.java 
        Version.java 
Log Message:
6217:
Implement PeerHandler. Web interface works. FCP does not work. Probably has 
significant bugs. This is unstable after all... and others may want to look at it.


Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.163
retrieving revision 1.164
diff -u -r1.163 -r1.164
--- ConnectionHandler.java      3 Oct 2003 08:35:19 -0000       1.163
+++ ConnectionHandler.java      4 Oct 2003 01:16:55 -0000       1.164
@@ -97,8 +97,6 @@
     /**  Count objects for number of sends in progress of pending */
     //private final Count sending=new Count(0, sendLock);
     // AFM (another fucking monitor)
-    private volatile int sendingCount = 0;
-    
 
     private volatile long sendQueueSize    = 0;
     private volatile long totalDataSent =0;
@@ -114,6 +112,8 @@
        private final Object trailerSendLock = new Object();
        private int sendingTrailerChunkBytes = 0;
[...1819 lines suppressed...]
@@ -3069,6 +2792,10 @@
                return peer;
        }
        
+       public final PeerHandler getPeerHandler() {
+               return peerHandler;
+       }
+       
     /**
      [EMAIL PROTECTED]    identity of the Peer on the other end of the connection
      */
@@ -3451,7 +3178,7 @@
 //     }
        
        public String toString() {
-               return super.toString()+" for "+conn+","+link+", sending 
"+sentMessagesCount;
+               return super.toString()+" for "+conn+","+link+", sending "+sentPacket;
        }
 }
 

Index: ConnectionHandlerComparator.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandlerComparator.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ConnectionHandlerComparator.java    20 Jul 2003 08:03:35 -0000      1.8
+++ ConnectionHandlerComparator.java    4 Oct 2003 01:16:55 -0000       1.9
@@ -72,7 +72,7 @@
                                        case PEER_IDENTITY:
                                                return 
secondaryCompare(iSign,ch1.peerIdentity().toString().compareTo(ch2.peerIdentity().toString()),ch1,ch2);
                                        case SENDING_COUNT:
-                                               return secondaryCompare(iSign,new 
Integer(ch1.sendingCount()).compareTo(new Integer(ch2.sendingCount())),ch1,ch2);
+                                               return secondaryCompare(iSign,new 
Integer(ch1.sending() ? 1 : 0).compareTo(new Integer(ch2.sending() ? 1 : 0)),ch1,ch2);
                                        case SENDQUEUE:
                                                return secondaryCompare(iSign,new 
Long(ch1.sendQueueSize()).compareTo(new Long(ch2.sendQueueSize())),ch1,ch2);  
                                        case RECEIVING:

Index: Core.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Core.java,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -r1.42 -r1.43
--- Core.java   10 Sep 2003 21:19:51 -0000      1.42
+++ Core.java   4 Oct 2003 01:16:55 -0000       1.43
@@ -531,19 +531,19 @@
      *                                    or a new node did not respond to 
      *                                    handshake.
      */
-    public final ConnectionHandler makeConnection(Peer p)
-       throws CommunicationException {
-        return makeConnection(p, 0);
-    }
+//     public final ConnectionHandler makeConnection(Peer p)
+//     throws CommunicationException {
+//         return makeConnection(p, 0);
+//     }
     
     /**
      * Returns an open connection to a node, or null if there aren't any
      * free. Will ONLY take them from the cache, will not create new ones.
      */
-    public final ConnectionHandler getConnection(Peer p)
-       throws CommunicationException {
-       return connections.findFreeConnection(p.getIdentity());
-    }
+//     public final ConnectionHandler getConnection(Peer p)
+//     throws CommunicationException {
+//     return connections.findFreeConnection(p.getIdentity());
+//     }
     
     /**
      * Returns an open connection to a node, either by making a new one
@@ -553,39 +553,32 @@
      *                 ConnectFailedException when establishing new 
      *                 connections.
      */
-    public final ConnectionHandler makeConnection(Peer p, long timeout) 
-                                        throws CommunicationException {
-        //if (connections == null)
-        //    throw new CoreException("Core not begun");
-        return connections.getConnection(this, p, timeout);
-    }
-    
-    
+//     public final ConnectionHandler makeConnection(Peer p, long timeout) 
+//                                         throws CommunicationException {
+//         //if (connections == null)
+//         //    throw new CoreException("Core not begun");
+//         return connections.getConnection(this, p, timeout);
+//     }
+
     
     /**
      * Send the message using the appropriate protocol over a free or 
      * new connection.
      */
-    //public OutputStream sendMessage(Message m, Peer destination, long timeout) 
-    //                                            throws CommunicationException {
-    //    try {
-    //        return makeConnection(destination, timeout).sendMessage(m);
-    //    } catch (ConnectFailedException e) {
-    //        throw new SendFailedException(e);
-    //    }
-    //}
+    public final TrailerWriter sendMessage(Message m, Peer p, long timeout) 
+       throws CommunicationException {
+       return connections.sendMessage(m, p.getIdentity(), null, timeout,
+                                      PeerHandler.NORMAL);
+    }
 
     /**
      * Send the message using the appropriate protocol over a free or 
      * new connection.
      */
-    //public OutputStream sendMessage(Message m, Peer destination) 
-    //                            throws CommunicationException {
-    //    return sendMessage(m, destination, 0);
-    //}
-    
-
-
+    public final TrailerWriter sendMessage(Message m, Peer p)
+       throws CommunicationException {
+        return sendMessage(m, p, 0);
+    }
     
     // Digest for signatures
     private static final Digest ctx = SHA1.getInstance();

Index: MessageSendCallback.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/MessageSendCallback.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MessageSendCallback.java    8 Sep 2003 17:03:03 -0000       1.2
+++ MessageSendCallback.java    4 Oct 2003 01:16:55 -0000       1.3
@@ -4,5 +4,5 @@
 import java.io.OutputStream;
 
 public interface MessageSendCallback extends ExceptionCallback {
-    void setTrailerStream(OutputStream os); // must be called BEFORE the terminal 
success() or thrown()
+    void setTrailerWriter(TrailerWriter tw); // must be called BEFORE the terminal 
success() or thrown()
 }

Index: OpenConnectionManager.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/OpenConnectionManager.java,v
retrieving revision 1.111
retrieving revision 1.112
diff -u -r1.111 -r1.112
--- OpenConnectionManager.java  3 Oct 2003 08:35:19 -0000       1.111
+++ OpenConnectionManager.java  4 Oct 2003 01:16:56 -0000       1.112
@@ -47,8 +47,11 @@
 
     private final ThreadFactory tf;
     private final MultiValueTable chs;
-       private final Hashtable connectionJobs = new Hashtable();
        // One attempt to open a conn at once to each Peer
+       private final Hashtable connectionJobs = new Hashtable();
+       
+       // Map of Identity -> PeerHandler
+       private final Hashtable peerHandlers = new Hashtable();
        
     private int openConns;
        private Object openConnsSync = new Object();
@@ -56,8 +59,6 @@
     private LinkedList closedList = new LinkedList();
     
     private final LRUQueue lru = new LRUQueue();
-    // Queue of blacklisted peers
-    private final BlackLRUQueue blq = new BlackLRUQueue(60000,100,3);
     private int maxConnections = -1;
     
        private boolean logDEBUG = true;
@@ -76,14 +77,112 @@
         chs = new MultiValueTable(50, 3);
         openConns = 0;
     }
-
+       
+       public int countPeerHandlers() {
+               return peerHandlers.size();
+       }
+       
+//     public PeerHandler makePeerHandler(Peer p) {
+//             Identity i = p.getIdentity();
+//             synchronized(peerHandlers) {
+//                     PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
+//                     if(ph != null) return ph;
+//                     ph = new PeerHandler(i, Main.node, 
+//                                                              
Main.node.getMaxPacketLength());
+//                     peerHandlers.put(p, ph);
+//                     return ph;
+//             }
+//     }
+       
+       /**
+        * Get the NodeReference for the identity given, if it is available,
+        * from the peer handlers.
+        */
+       public NodeReference getNodeReference(Identity id) {
+               synchronized(peerHandlers) {
+                       PeerHandler ph = (PeerHandler)(peerHandlers.get(id));
+                       if(ph == null) return null;
+                       return ph.getReference();
+               }
+       }
+       
+       /**
+        * Add or update a peerhandler, called by Node.reference.
+        */
+       public PeerHandler addPeer(Identity id, NodeReference nr) {
+               synchronized(peerHandlers) {
+                       PeerHandler ph = (PeerHandler)(peerHandlers.get(id));
+                       if(ph != null) {
+                               ph.updateReference(nr);
+                       } else {
+                               ph = new PeerHandler(id, nr, Main.node,
+                                                                        
Main.node.getMaxPacketLength());
+                               peerHandlers.put(id, ph);
+                       }
+                       return ph;
+               }
+       }
+       
+       /**
+        * Does the connection specified need to open a new connection?
+        * If this returns false, messages will not be routed to that 
+        * identity.
+        */
+       public boolean needsConnection(Identity i) {
+               synchronized(peerHandlers) {
+                       PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
+                       if(ph == null) return false;
+                       return ph.needsConnection();
+               }
+       }
+       
+       private void removePeerHandler(Peer p) {
+               synchronized(peerHandlers) {
+                       peerHandlers.remove(p);
+               }
+       }
+       
+       public void addPeerHandler(Identity i, PeerHandler ph, 
+                                                          boolean allowedToFail) {
+               synchronized(peerHandlers) {
+                       Object oph = peerHandlers.get(i);
+                       if(oph != null) {
+                               if(!allowedToFail)
+                                       Core.logger.log(this, "Not replacing "+oph+
+                                                                       " with "+ph+" 
for "+i,
+                                                                       new 
Exception("debug"),
+                                                                       Logger.NORMAL);
+                       } else
+                               peerHandlers.put(i, ph);
+               }
+       }
+       
     /**
      * This method is package only, meant only for ConnectionHandler
      * objects to add themselves.
      */
     void put(ConnectionHandler ch) {
                Identity i = ch.peerIdentity();
-               if(i == null) throw new IllegalArgumentException("Must have a 
peerIdentity");
+               if(i == null) 
+                       throw new IllegalArgumentException("Must have a peerIdentity");
+               // Find Peer for ch
+               PeerHandler ph;
+               synchronized(peerHandlers) {
+                       // REDFLAG: with sessionv2 we will know the Peer from 
negotiation
+                       ph = (PeerHandler)(peerHandlers.get(i));
+                       if(ph == null) {
+                               // Hrrrrrrm
+                               // Hopefully this won't result in a put()!
+                               NodeReference ref = ch.targetReference();
+                               if(ref == null)
+                                       ref = Main.node.rt.getNodeReference(i);
+                               ph = new PeerHandler(i, ref, Main.node, 
+                                                                        
Main.node.getMaxPacketLength());
+                               peerHandlers.put(i, ph);
+                       }
+               }
+               // ph != null now
+               ch.setPeerHandler(ph); // will register on ph
                Object syncOb = chs.getSync(i);
                if(syncOb == null) {
                        // FIXME: adequately synchronized? I _think_ so because of 
chs's internal synchronization...
@@ -112,9 +211,29 @@
         KillSurplusConnections();
     }
     
+       public PeerHandler makePeerHandler(Identity i, NodeReference ref) {
+               synchronized(peerHandlers) {
+                       PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
+                       if(ph != null) return ph;
+                       if(ref == null)
+                               ref = Main.node.rt.getNodeReference(i);
+                       ph = new PeerHandler(i, ref, Main.node, 
+                                                                
Main.node.getMaxPacketLength());
+                       peerHandlers.put(i, ph);
+                       return ph;
+               }
+       }
+       
+       public PeerHandler getPeerHandler(Identity id) {
+               synchronized(peerHandlers) {
+                       return (PeerHandler)(peerHandlers.get(id));
+               }
+       }
+       
     // Removing a connection that isn't in the OCM is a 
     // legal NOP.
     ConnectionHandler remove(ConnectionHandler ch) {
+               // CH.terminate() will remove from PeerHandler
         //if (ch.peerIdentity() == null) return null;
                Identity id = ch.peerIdentity();
                Object syncOb = chs.getSync(id);
@@ -147,89 +266,88 @@
      * This will return an open and not busy ConnectionHandler to the 
      * node identified.
      */
-    public ConnectionHandler findFreeConnection(Identity id) {
-               logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
-               if (logDEBUG)
-               Core.logger.log(this, "findFreeConnection("+id+")",
-                                               new Exception("debug"), Logger.DEBUG);
-               Enumeration e = chs.getAll(id); //moved this out here --zab
-               ConnectionHandler confirmed = null; // if we find one that is 
instantly suitable...
-               HashSet candidates = new HashSet(); //put sending() && 
!reallySending() CHs here
-               //so that we iterate over smaller collections
-               Object syncOb = chs.getSync(id);
-               if(syncOb == null) return null;
-               synchronized(syncOb) {
-                       while (e.hasMoreElements()) {
-                               ConnectionHandler res = (ConnectionHandler) 
e.nextElement();
-                               if (!res.isOpen()) {
-                                       if(logDEBUG)
-                                               Core.logger.log(this, "Skipped closed 
connection "+res,
-                                                                               
Logger.MINOR);
-                                       // It will be terminated eventually
-                                       // Do not remove it from OCM because it will 
then be orphaned and take up a fd even though it is not available for sending.
-                               } else if (!res.sending()) {
-                                       if (logDEBUG)
-                                               Core.logger.log(this, "Found "+res, 
Logger.MINOR);
-                                       confirmed = res;
-                                       break;
-                               } else {
-                                       if(!res.reallySending()){
-                                               candidates.add(res);
-                                       } else if (logDEBUG) {
-                                               Core.logger.log(this, "Skipping: 
"+res+": sending",
-                                                                               
Logger.MINOR);
-                                       }
-                               }
-                       }
-               }
-               if(confirmed != null) {
-                       // found one
-                       lru.push(confirmed);
-                       // Mark this connection as cached
-                       // so we know to discount it in 
-                       // connection success accounting in 
-                       // the Routing implementation.
-                       confirmed.setCached(true);
-                       return confirmed;
-               }
-               /*
-                * imho this should be 1, not 2.  by opening new connections too often
-                * you starve the trailers and other messages and end up with a 
feedback loop
-                * I'll test it locally with 1 and report the results.
-                * --zab
-                * Probably. Although it'd be nice to have a spare open for a new 
trailer.
-                * Also possible problems with closed connections.
-                * --amphibian
-                */
-               if(candidates.size() < 1) {
-                       Main.node.scheduleConnectionOpener(id);
-                       // open another one
-               }
-               if (candidates.size() == 0) //shortcut
-                       return null; //we may want to notify the user we've scheduled 
to open a new conn
+//     public ConnectionHandler findFreeConnection(Identity id) {
+//             logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
+//             if (logDEBUG)
+//             Core.logger.log(this, "findFreeConnection("+id+")",
+//                                             new Exception("debug"), Logger.DEBUG);
+//             Enumeration e = chs.getAll(id); //moved this out here --zab
+//             ConnectionHandler confirmed = null; // if we find one that is 
instantly suitable...
+//             HashSet candidates = new HashSet(); //put sending() && 
!reallySending() CHs here
+//             //so that we iterate over smaller collections
+//             Object syncOb = chs.getSync(id);
+//             if(syncOb == null) return null;
+//             synchronized(syncOb) {
+//                     while (e.hasMoreElements()) {
+//                             ConnectionHandler res = (ConnectionHandler) 
e.nextElement();
+//                             if (!res.isOpen()) {
+//                                     if(logDEBUG)
+//                                             Core.logger.log(this, "Skipped closed 
connection "+res,
+//                                                                             
Logger.MINOR);
+//                                     // It will be terminated eventually
+//                                     // Do not remove it from OCM because it will 
then be orphaned and take up a fd even though it is not available for sending.
+//                             } else if (!res.sending()) {
+//                                     if (logDEBUG)
+//                                             Core.logger.log(this, "Found "+res, 
Logger.MINOR);
+//                                     confirmed = res;
+//                                     break;
+//                             } else {
+//                                     if(!res.reallySending()){
+//                                             candidates.add(res);
+//                                     } else if (logDEBUG) {
+//                                             Core.logger.log(this, "Skipping: 
"+res+": sending",
+//                                                                             
Logger.MINOR);
+//                                     }
+//                             }
+//                     }
+//             }
+//             if(confirmed != null) {
+//                     // found one
+//                     lru.push(confirmed);
+//                     // Mark this connection as cached
+//                     // so we know to discount it in 
+//                     // connection success accounting in 
+//                     // the Routing implementation.
+//                     confirmed.setCached(true);
+//                     return confirmed;
+//             }
+//             /*
+//              * imho this should be 1, not 2.  by opening new connections too often
+//              * you starve the trailers and other messages and end up with a 
feedback loop
+//              * I'll test it locally with 1 and report the results.
+//              * --zab
+//              * Probably. Although it'd be nice to have a spare open for a new 
trailer.
+//              * Also possible problems with closed connections.
+//              * --amphibian
+//              */
+//             if(candidates.size() < 1) {
+//                     Main.node.scheduleConnectionOpener(id);
+//                     // open another one
+//             }
+//             if (candidates.size() == 0) //shortcut
+//                     return null; //we may want to notify the user we've scheduled 
to open a new conn
                
-               ConnectionHandler best = null;
-               int bestVal = Integer.MAX_VALUE;
-               for (Iterator i = candidates.iterator() ; i.hasNext() ; ) {
-                       ConnectionHandler res = (ConnectionHandler) i.next();
-                       if(res.useValue() < bestVal) {
-                               best = res;
-                               bestVal = res.useValue();
-                       }
-                       
-               }
-               if(best != null) {
-                       synchronized(lru) {
-                               lru.push(best); // sync(lru)
-                               best.setCached(true);
-                       }
-                       return best;
-               }
-               if(logDEBUG)
-                       Core.logger.log(this, "Couldn't find open connection for "+
-                                                       id, Logger.DEBUG);
-               return null;
-    }
+//             ConnectionHandler best = null;
+//             int bestVal = Integer.MAX_VALUE;
+//             for (Iterator i = candidates.iterator() ; i.hasNext() ; ) {
+//                     ConnectionHandler res = (ConnectionHandler) i.next();
+//                     if(res.useValue() < bestVal) {
+//                             best = res;
+//                             bestVal = res.useValue();
+//                     }
+//             }
+//             if(best != null) {
+//                     synchronized(lru) {
+//                             lru.push(best); // sync(lru)
+//                             best.setCached(true);
+//                     }
+//                     return best;
+//             }
+//             if(logDEBUG)
+//                     Core.logger.log(this, "Couldn't find open connection for "+
+//                                                     id, Logger.DEBUG);
+//             return null;
+//     }
     
     public void markClosed(ConnectionHandler ch) {
         synchronized(closedList) {
@@ -243,33 +361,33 @@
      * smallest value of sendQueueSize() divided by the preference value of
      * the transport.
      */
-    private ConnectionHandler findBestConnection(Identity id) {
-        ConnectionHandler res, best = null;
-        double resval, bestval = Double.POSITIVE_INFINITY;
-               Object syncOb = chs.getSync(id);
-               if(syncOb == null) return null;
-               synchronized(syncOb) {
-                       for (Enumeration e = chs.getAll(id) ; e.hasMoreElements() ; ) {
-                               res = (ConnectionHandler) e.nextElement();
-                               if (!res.isOpen()) {
-                                       if(logDEBUG)
-                                               Core.logger.log(this, "Skipped closed 
connection "+res,
-                                                                               
Logger.DEBUG);
-                                       // It will be terminated eventually
-                                       // Do not remove it from OCM because it will 
then be orphaned and take up a fd even though it is not available for sending.
-                               } else {
-                                       resval = res.sendQueueSize() / 
-                                               res.transport().preference();
+//     private ConnectionHandler findBestConnection(Identity id) {
+//         ConnectionHandler res, best = null;
+//         double resval, bestval = Double.POSITIVE_INFINITY;
+//             Object syncOb = chs.getSync(id);
+//             if(syncOb == null) return null;
+//             synchronized(syncOb) {
+//                     for (Enumeration e = chs.getAll(id) ; e.hasMoreElements() ; ) {
+//                             res = (ConnectionHandler) e.nextElement();
+//                             if (!res.isOpen()) {
+//                                     if(logDEBUG)
+//                                             Core.logger.log(this, "Skipped closed 
connection "+res,
+//                                                                             
Logger.DEBUG);
+//                                     // It will be terminated eventually
+//                                     // Do not remove it from OCM because it will 
then be orphaned and take up a fd even though it is not available for sending.
+//                             } else {
+//                                     resval = res.sendQueueSize() / 
+//                                             res.transport().preference();
                                        
-                                       if (resval < bestval) {
-                                               bestval = resval;
-                                               best = res;
-                                       }
-                               }
-                       }
-                       return best;
-               }
-    }
+//                                     if (resval < bestval) {
+//                                             bestval = resval;
+//                                             best = res;
+//                                     }
+//                             }
+//                     }
+//                     return best;
+//             }
+//     }
     
     public int countConnections(Identity id) {
                Object syncOb = chs.getSync(id);
@@ -326,13 +444,6 @@
                }
     }
     
-    public void blackList(Peer p){
-        Core.logger.log(this, "blackListing " + p, Logger.DEBUG);
-        if ( p != null && p.getIdentity() != null && p.getAddress() != null ) {
-            blq.push(p);
-        }
-    }    
-    
     /**
      * Creates a new Connection which is started and added.
      * @param c     The Core to connect from
@@ -355,21 +466,6 @@
                boolean updatedRefcount = false;
                
                boolean weStarted = false;
-        blq.clean();
-        Core.logger.log(this, "Current blackListQueue size: " + blq.size() +
-                        ", Checking " + p, 
-                        Logger.DEBUG);
-        if ( blq.isBlackListed(p) ) {
-            Core.logger.log(this, "Attempted to open connection for blackListed " +
-                            p,Logger.DEBUG);
-            ConnectFailedException e = 
-                new ConnectFailedException(p.getAddress(), 
-                                           p.getIdentity(),
-                                           "BlackListed",
-                                           true);
-            Core.logger.log(this, "Failed to connect: " + e, Logger.DEBUG);
-            throw e;
-        }
                try {
                        synchronized(connectionJobs) {
                 if ( ( ct = (ConnectionJob)connectionJobs.get(p) ) != null ) {
@@ -383,7 +479,7 @@
                         ct.incRefcount();
                     }
                 }
-                    
+                               
                                if(ct == null) {
                                        weStarted = true;
                                        ct = new ConnectionJob(c, p);
@@ -472,6 +568,25 @@
                }
     }
     
+       public TrailerWriter sendMessage(Message m, Identity i, NodeReference ref, 
+                                                                        long timeout, 
int msgPrio) 
+               throws SendFailedException {
+               return makePeerHandler(i, ref).sendMessage(m, timeout, msgPrio);
+       }
+       
+       public void sendMessageAsync(Message m, Identity i, NodeReference ref,
+                                                                MessageSendCallback 
cb, int msgPrio) 
+               throws SendFailedException {
+               makePeerHandler(i, ref).sendMessageAsync(m, cb, msgPrio);
+       }
+       
+       public void unsendMessage(Identity i, MessageSendCallback cb) {
+               PeerHandler ph = getPeerHandler(i);
+               if(ph != null) {
+                       ph.unsendMessage(cb);
+               }
+       }
+       
        /**
         * Kills off one or more connections to ensure that we stay below the 
connection limit
         * This method is appropriate to call after a connection has been added to the 
OCM 
@@ -546,26 +661,27 @@
             }
         }
     }
+       
     /**
      * Returns a free connection, making a new one if none is available.
      */
-    public ConnectionHandler getConnection(Core c, Peer p, long timeout) 
-               throws CommunicationException {
+//     public ConnectionHandler getConnection(Core c, Peer p, long timeout) 
+//             throws CommunicationException {
 
-        // Let the race begin!
-        // If non-null it was open and free when findFreeConnection returned
-        // but nothing guarantees that some other thread won't have sent a
-        // message with a huge trailing field by the time you actually
-        // try to send a message on it.
-               Core.logger.log(this, "getConnection("+p+","+timeout+")",
-                                               Logger.DEBUG);
-        ConnectionHandler ch = findFreeConnection(p.getIdentity()); 
+//         // Let the race begin!
+//         // If non-null it was open and free when findFreeConnection returned
+//         // but nothing guarantees that some other thread won't have sent a
+//         // message with a huge trailing field by the time you actually
+//         // try to send a message on it.
+//             Core.logger.log(this, "getConnection("+p+","+timeout+")",
+//                                             Logger.DEBUG);
+//         ConnectionHandler ch = findFreeConnection(p.getIdentity()); 
                
-        if (ch == null) {
-            ch = createConnection(c, p, timeout);
-        }
-        return ch;
-    }
+//         if (ch == null) {
+//             ch = createConnection(c, p, timeout);
+//         }
+//         return ch;
+//     }
 
     /**
      * Gives the number of registered open connections.
@@ -714,11 +830,11 @@
                                String imageURL = "/servlet/images/aqua/arrow";
                                if(ch.outbound) imageURL += "_outbound";
                                else imageURL += "_inbound";
-                               if(ch.receiving() && ch.reallySending())
+                               if(ch.receiving() && ch.sending()) //Can this happen 
yet?
                                        imageURL += "_both";
                                else
                                        if(ch.receiving()) imageURL += "_receiving";
-                                       else if(ch.reallySending()) imageURL += 
"_transmitting";
+                                       else if(ch.sending()) imageURL += 
"_transmitting";
                                        else imageURL += "_sleeping";
                                imageURL += ".png";
                                buffer.append("<center><img src='" + imageURL + "' 
height = '15' width = '24'>" + 
@@ -777,7 +893,7 @@
                        //int x = ch.sendingCount();
                        //if(x > 0) sending++;;
                        //buffer.append(ch.sendingCount() + sep); Remove until there 
can actually be other values than 0 and 1 here 
-                       if(ch.reallySending())
+                       if(ch.sending())
                                sending++;
                        
                        if(ch.receiving())
@@ -791,7 +907,7 @@
                        
                        buffer.append(sepAlignRight+ch.messages());
                        if(useOldStyle || viewLevel > 0){
-                               if(ch.reallySending())
+                               if(ch.sending())
                                        buffer.append
                                                (sepAlignRight + 
                                                 (viewLevel > 1
@@ -826,7 +942,7 @@
                                          ).replaceAll(" ","&nbsp;")
                                         );
                        }else{
-                               if(ch.reallySending()||ch.receiving())
+                               if(ch.sending()||ch.receiving())
                                        
buffer.append(sepAlignRight+format(ch.sendQueueSize()));
                                else
                                        buffer.append(sepAlignRight+"-");
@@ -1281,13 +1397,14 @@
                                        
                                        if(logDEBUG) Core.logger.log(this, "configged 
WSL for "+
                                                                                       
          ch+" ("+this+")", Logger.DEBUG);
-                    if (!core.hasInterfaceFor(ch.transport())) {
-                        // if we don't have an interface for this transport, we
-                        // will ask this connection to persist.
-                        Message m = ch.presentationType().getSustainMessage();
-                        if (m != null)
-                            ch.sendMessage(m);
-                    }
+                                       // IIRC we don't use sustain anymore: FIXME -- 
amphibian
+//                     if (!core.hasInterfaceFor(ch.transport())) {
+//                         // if we don't have an interface for this transport, we
+//                         // will ask this connection to persist.
+//                         Message m = ch.presentationType().getSustainMessage();
+//                         if (m != null)
+//                             ch.sendMessage(m);
+//                     }
                    
                                        long now = System.currentTimeMillis();
                                        long connectingTime = now - start;

Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PeerHandler.java    8 Sep 2003 17:03:03 -0000       1.1
+++ PeerHandler.java    4 Oct 2003 01:16:56 -0000       1.2
@@ -1,180 +1,382 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
 package freenet;
 import java.util.*;
 import java.io.OutputStream;
 import freenet.Core;
 import freenet.node.Node;
+import freenet.node.NodeReference;
 import freenet.session.Link;
 import freenet.support.Logger;
 
-class PeerHandler {
-    Peer peer;
+/**
+ * PeerHandler
+ *
+ * Class managing and representing our communications with a remote peer.
+ * Contains message queue, connection and backoff logic, etc.
+ * Despite the name, a PeerHandler is unique by its Identity, and it keeps
+ * a NodeReference for purposes of opening connections.
+ */
+public class PeerHandler {
+    final Identity id; // the node's identity - never changes
     
-    LinkedList messages;
+    NodeReference ref; // the current reference, can be null, meaning we can only 
talk back over open conns
     
-    LinkedList connectionHandlers;
+    final LinkedList messages;
     
-    Node node;
+    final LinkedList connectionHandlers;
+       
+    final Node node;
     
     int maxPacketSize;
-
-    public PeerHandler(Peer p, Node n, int maxPacketSize) {
-       this.peer = p;
-       this.node = n;
-       this.maxPacketSize = maxPacketSize;
+       
+       // Packet priorities
+    public static final int EXPENDABLE = 0;
+    public static final int NORMAL = 1;
+    
+       public String toString() {
+               return super.toString() + " ("+id+")";
+       }
+       
+       /**
+        * Register a ConnectionHandler to us. This should be called as soon as
+        * it has completed negotiations, not waiting for the Identify.
+        */
+    public void registerConnectionHandler(ConnectionHandler ch) {
+               Core.logger.log(this, "Registering "+ch+" on "+this,
+                                               Logger.MINOR);
+               synchronized(connectionHandlers) {
+                       connectionHandlers.add(ch);
+               }
+    }
+    
+       /**
+        * Unregister a ConnectionHandler. This should be called after we
+        * have lost the ability to send and receive messages on the CH.
+        */
+    public void unregisterConnectionHandler(ConnectionHandler ch) {
+               Core.logger.log(this, "Unregistering "+ch+" on "+this,
+                                               Logger.MINOR);
+               synchronized(connectionHandlers) {
+                       connectionHandlers.remove(ch);
+               }
+               if(connectionHandlers.size() == 0 &&
+                  ref == null && !messages.isEmpty()) {
+                       Core.logger.log(this, "Lost all connections for "+id+
+                                                       ", but "+messages.size()+" 
messages left",
+                                                       Logger.NORMAL);
+               }
+    }
+    
+       /**
+        * Set the NodeReference. If the new NodeReference supersedes the old one,
+        * it will be set. This is the same logic used in the routing table.
+        * Should be called amonst other occasions by ConnectionHandler when we
+        * receive an Identify message.
+        * @return the current NodeReference
+        */
+       public NodeReference updateReference(NodeReference nr) {
+               if(nr.supersedes(ref)) ref = nr;
+               return ref;
+       }
+       
+       public NodeReference getReference() {
+               return ref;
+       }
+       
+       /**
+        * Do we need to open a new connection?
+        * If this returns false, messages will not be routed to that 
+        * identity (see *Routing).
+        */
+    public boolean needsConnection() {
+               if(messages.isEmpty() &&
+                  id == null || (!(node.rt.references(id)))) return false;
+               // FIXME: layering
+               
+               synchronized(connectionHandlers) {
+                       for(Iterator e = connectionHandlers.listIterator(0);
+                               e.hasNext();) {
+                               ConnectionHandler ch = (ConnectionHandler)(e.next());
+                               if(!ch.isOpen()) continue;
+                               if(ch.isSendingPacket()) continue;
+                               return false;
+                       }
+               }
+               return true;
+    }
+    
+       public void unsendMessage(PeerPacketMessage pm) {
+               // FIXME: speed up - hashtable?
+               synchronized(messages) {
+                       for(Iterator i = messages.listIterator(0); i.hasNext();) {
+                               PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
+                               if(cmp == pm) {
+                                       i.remove();
+                                       return;
+                               }
+                       }
+               }
+       }
+       
+       public void unsendMessage(MessageSendCallback cb) {
+               // FIXME: speed up - hashtable?
+               synchronized(messages) {
+                       for(Iterator i = messages.listIterator(0); i.hasNext();) {
+                               PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
+                               MessageSendCallback cmpCB = cmp.cb;
+                               if(cb == cmpCB) {
+                                       i.remove();
+                                       return;
+                               }
+                       }
+               }
+       }
+       
+    /**
+     * Create a PeerHandler
+     * @param id the identity of the peer. Null for an FCP connection.
+     * @param ref the initial NodeReference of the peer - can be null, meaning we
+     * don't know yet, but we can reply over currently open conns.
+     * @param n the Node.
+     * @param maxPacketSize the maximum length allowable for sending packets.
+     */
+    public PeerHandler(Identity id, NodeReference ref, Node n, int maxPacketSize) {
+               this.id = id;
+               this.ref = ref;
+               this.node = n;
+               this.maxPacketSize = maxPacketSize;
+               messages = new LinkedList();
+               connectionHandlers = new LinkedList();
     }
     
     /**
      * Start an asynchronous message send
      * Call the callback provided when it is finished
      */
-    synchronized void sendMessageAsync(RawMessage r, MessageSendCallback cb) 
-       throws SendFailedException {
-       PeerPacketMessage pm = new PeerPacketMessage(peer, r, cb);
-       throw new UnsupportedOperationException("PeerHandler.sendMessageAsync");
-       /* Possibilities:
-        *
-        * 1. No ConnectionHandlers (without trailing fields) open.
-        * Strategy: open one, then send message
-        * 2. Idle ConnectionHandlers
-        * Strategy: pick one, make it send a single message packet
-        * 3. No Idle ConnectionHandlers
-        * Strategy: wait for one to ask us for a packet
-        */
-//     int handlersSendingPackets = 0;
-//     for(Iterator e = connectionHandlers.listIterator(0);
-//         e.hasNext();) {
-//         ConnectionHandler ch =
-//             (ConnectionHandler)(e.next());
-//         if(!ch.isOpen()) {
-//             // Can't send messages
-//             Core.logger.log(this, ch.toString()+" can't send messages",
-//                             Logger.DEBUG);
-//             e.remove();
-//             continue;
-//         }
-// //      if(ch.isSendingPacket()) {
-// //          handlersSendingPackets++;
-// //      }
-//         // Not busy!
-//         sendSinglePacket(ch, pm);
-//         return;
-//         // No suitable ConnectionHandlers found!
-//         // Queue the message
-//         messages.addLast(pm);
-//         if(handlersSendingPackets > 0) {
-//             // One of them will call us
-//         } else {
-//             // Uh oh...
-//             node.scheduleConnectionOpener(peer.getIdentity());
-//             // When it registers with OCM, it will getPacket()
-//         }
-//     }
-    }
+    public synchronized void sendMessageAsync(Message r, MessageSendCallback cb,
+                                                                                      
   int msgPrio)
+               throws SendFailedException {
+               PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+               innerSendMessageAsync(pm);
+       }
+       
+       protected synchronized void innerSendMessageAsync(PeerPacketMessage pm) {
+               /* Possibilities:
+                *
+                * 1. No ConnectionHandlers (without trailing fields) open.
+                * Strategy: open one, then send message
+                * 2. Idle ConnectionHandlers
+                * Strategy: pick one, make it send a single message packet
+                * 3. No Idle ConnectionHandlers
+                * Strategy: wait for one to ask us for a packet
+                */
+               Core.logger.log(this, "Sending "+pm+" on "+this,
+                                               Logger.DEBUG);
+               int handlersSendingPackets = 0;
+               synchronized(connectionHandlers) {
+                       for(Iterator e = connectionHandlers.listIterator(0);
+                               e.hasNext();) {
+                               ConnectionHandler ch =
+                                       (ConnectionHandler)(e.next());
+                               if(!ch.isOpen()) {
+                                       // Can't send messages
+                                       Core.logger.log(this, ch.toString()+
+                                                                       " can't send 
messages for "+pm+" on "+
+                                                                       this, 
Logger.DEBUG);
+                                       e.remove();
+                                       continue;
+                               }
+                               if(ch.isSendingPacket()) {
+                                       Core.logger.log(this, ch.toString()+
+                                                                       " already 
sending a packet for "+pm+
+                                                                       " on "+this, 
Logger.DEBUG);
+                                       handlersSendingPackets++;
+                                       continue;
+                               }
+                               // Not busy!
+                               Core.logger.log(this, ch.toString()+
+                                                               ": send one packet 
"+pm+" for "+this,
+                                                               Logger.DEBUG);
+                               sendSinglePacket(ch, pm);
+                               return;
+                       }
+               }
+               // No suitable ConnectionHandlers found!
+               // Queue the message
+               Core.logger.log(this, "Queueing "+pm+" on "+this,
+                                               Logger.DEBUG);
+               if(id == null && handlersSendingPackets == 0) {
+                       // Can't open a connection
+                       pm.notifyFailure(null);
+                       Core.logger.log(this, "Failed to send packet, no more conns: "+
+                                                       pm+" on "+this, Logger.DEBUG);
+                       return;
+               }
+               synchronized(messages) {
+                       messages.addLast(pm);
+               }
+               if(handlersSendingPackets > 0) {
+                       // One of them will call us
+               } else {
+                       // Uh oh...
+                       if(id != null)
+                               node.scheduleConnectionOpener(id);
+                       // When it registers with OCM, it will getPacket()
+               }
+               Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
+                                                                                      
   messages.size());
+       }
     
     /**
-     * Send one packet on one ConnectionHandler
+     * Send one message on one ConnectionHandler.
+        * @param ch the connection on which to send the message
+        * @param pm the single message to send
      */
     protected void sendSinglePacket(ConnectionHandler ch, 
-                                   PeerPacketMessage pm) {
-       PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
-       PeerPacket packet = new PeerPacket(msgs, ch.getLink());
-       //ch.sendPacket(packet);
-       throw new UnsupportedOperationException("sendSinglePacket");
+                                                                       
PeerPacketMessage pm) {
+               PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
+               PeerPacket packet = new PeerPacket(msgs, ch.getLink(),
+                                                                                  
ch.presentationType());
+               ch.sendPacket(packet, freenet.transport.WriteSelectorLoop.MESSAGE);
     }
     
+       public final PeerPacket getPacket(Link link, Presentation p) {
+               return getPacket(link, p, null, null, false);
+       }
+       
     /**
      * Get a packet to send. Called by a ConnectionHandler.
      * It will then send it. synchronized so as to avoid isSendingPacket()
      * race with sendMessageAsync. Returns null if nothing to send.
+        * @param m link specific message to prepend to the packet, or null
+        * if not needed. Used for such hacks as the Identify message.
+        * @param i identity for the message - not needed unless we m != null.
+        * @param onlyGivenMsg if true, only include the given message, used for
+        * starting a close dialog.
      */
-    public synchronized PeerPacket getPacket(Link link) {
-       /** THE RULES
-        *
-        * Collect as many messages as will fit in one packet.
-        * If a message is bigger than the maximum, add it on to the packet 
-        * and send the oversized packet anyway.
-        * Stop if a message includes a trailing field.
-        */
-       int packetLength = 0;
-       LinkedList packetMessages = new LinkedList();
-       while(!messages.isEmpty()) {
-           PeerPacketMessage msg = 
-               (PeerPacketMessage)(messages.removeFirst());
-           if((msg.getLength() > maxPacketSize) || 
-              msg.hasTrailer()) {
-               packetMessages.addLast(msg);
-               packetLength += msg.getLength();
-               break;
-           }
-           if(msg.getLength() + packetLength >
-              maxPacketSize && (!packetMessages.isEmpty())) {
-               messages.addFirst(msg);
-               break;
-           }
-           packetLength += msg.getLength();
-       }
-       if(packetMessages.isEmpty()) return null;
-       else {
-           PeerPacketMessage[] msgs = 
-               new PeerPacketMessage[packetMessages.size()];
-           packetMessages.toArray(msgs);
-           return new PeerPacket(msgs, link);
-       }
+    public synchronized PeerPacket getPacket(Link link, Presentation p,
+                                                                                      
  Identity i, Message m,
+                                                                                      
  boolean onlyGivenMsg) {
+               Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
+                                                                                      
   messages.size());
+               /** THE RULES
+                *
+                * Collect as many messages as will fit in one packet.
+                * If a message is bigger than the maximum, add it on to the packet 
+                * and send the oversized packet anyway.
+                * Stop if a message includes a trailing field.
+                */
+               int packetLength = 0;
+               LinkedList packetMessages = new LinkedList();
+               if(m != null) {
+                       packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL));
+               }
+               if(!onlyGivenMsg) {
+                       synchronized(messages) {
+                               while(!messages.isEmpty()) {
+                                       PeerPacketMessage msg = 
+                                               
(PeerPacketMessage)(messages.removeFirst());
+                                       msg.resolve(p); // we need the length
+                                       if((msg.getLength() > maxPacketSize) || 
+                                          msg.hasTrailer()) {
+                                               packetMessages.addLast(msg);
+                                               packetLength += msg.getLength();
+                                               break;
+                                       }
+                                       if(msg.getLength() + packetLength >
+                                          maxPacketSize && 
(!packetMessages.isEmpty())) {
+                                               messages.addFirst(msg);
+                                               break;
+                                       }
+                                       packetLength += msg.getLength();
+                               }
+                       }
+               }
+               if(packetMessages.isEmpty()) return null;
+               else {
+                       PeerPacketMessage[] msgs = 
+                               new PeerPacketMessage[packetMessages.size()];
+                       packetMessages.toArray(msgs);
+                       return new PeerPacket(msgs, link, p);
+               }
     }
     
+       public TrailerWriter sendMessage(Message m) throws SendFailedException {
+               return sendMessage(m, 0, NORMAL);
+       }
+       
     /**
      * Send a message synchronously
      * @return a stream to write the trailing message to, or null
      * if the message does not have a trailing field.
      */
-    OutputStream sendMessage(RawMessage r) throws SendFailedException {
-       MyMessageSendCallback cb =
-           new MyMessageSendCallback();
-       sendMessageAsync(r, cb);
-       while(!cb.finished) {
-           try {
-               cb.wait();
-           } catch (InterruptedException e) {}
-       }
-       if(cb.e != null) {
-           if(cb.e instanceof SendFailedException) 
-               throw (SendFailedException)(cb.e);
-           else {
-               Core.logger.log(this, "Got unexpected exception: "+
-                               cb.e+" sending "+r, Logger.ERROR);
-               SendFailedException e = 
-                   new SendFailedException(peer.getAddress(), 
-                                           peer.getIdentity(),
-                                           "Unexpected exception "+
-                                           cb.e, false);
-               e.initCause(cb.e);
-               throw e;
-           }
-       } else
-           return cb.trailerStream;
+    public TrailerWriter sendMessage(Message r, long timeout,
+                                                                        int msgPrio) 
throws SendFailedException {
+               MyMessageSendCallback cb =
+                       new MyMessageSendCallback();
+               PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+               innerSendMessageAsync(pm);
+               long timeoutAt = System.currentTimeMillis() + timeout;
+               synchronized(cb) {
+                       while(!cb.finished) {
+                               try {
+                                       long waitTime;
+                                       if(timeout > 0) {
+                                               waitTime = timeoutAt - 
System.currentTimeMillis();
+                                               if(waitTime < 0) break;
+                                       } else waitTime = 0;
+                                       cb.wait(waitTime);
+                               } catch (InterruptedException e) {}
+                       }
+               }
+               if(!cb.finished) {
+                       // Couldn't send message
+                       unsendMessage(pm);
+                       throw new SendFailedException(null, // FIXME?
+                                                                                 id, 
"Timed out sending message",
+                                                                                 
false);
+               }
+               if(cb.e != null) {
+                       if(cb.e instanceof SendFailedException) 
+                               throw (SendFailedException)(cb.e);
+                       else {
+                               Core.logger.log(this, "Got unexpected exception: "+
+                                                               cb.e+" sending "+r, 
Logger.ERROR);
+                               SendFailedException e = 
+                                       new SendFailedException(null, // FIXME?
+                                                                                      
 id, "Unexpected exception "+
+                                                                                      
 cb.e, false);
+                               e.initCause(cb.e);
+                               throw e;
+                       }
+               } else
+                       return cb.trailer;
     }
     
+       /**
+        * Helper class used to emulate blocking sends.
+        */
     class MyMessageSendCallback implements MessageSendCallback {
-       boolean finished = false;
-       Exception e = null;
-       OutputStream trailerStream = null;
-       public void succeeded() {
-           finished = true;
-           synchronized(this) {
-               this.notify();
-           }
-       }
-       
-       public void thrown(Exception e) {
-           this.e = e;
-           finished = true;
-           synchronized(this) {
-               this.notify();
-           }
-       }
+               boolean finished = false;
+               Exception e = null;
+               TrailerWriter trailer = null;
+               public void succeeded() {
+                       finished = true;
+                       synchronized(this) {
+                               this.notify();
+                       }
+               }
+               
+               public void thrown(Exception e) {
+                       this.e = e;
+                       finished = true;
+                       synchronized(this) {
+                               this.notify();
+                       }
+               }
        
-       public void setTrailerStream(OutputStream os) {
-           trailerStream = os;
-       }
+               public void setTrailerWriter(TrailerWriter tw) {
+                       trailer = tw;
+               }
     }
 }

Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PeerPacket.java     18 Sep 2003 17:48:04 -0000      1.2
+++ PeerPacket.java     4 Oct 2003 01:16:56 -0000       1.3
@@ -3,17 +3,19 @@
 import java.io.OutputStream;
 
 class PeerPacket {
+    Presentation p;
     PeerPacketMessage[] messages;
     byte[] encryptedData; // plaintext. Encrypt at send time.
     int sentBytes;
     
-    PeerPacket(PeerPacketMessage[] msgs, Link l) 
+    PeerPacket(PeerPacketMessage[] msgs, Link l, Presentation p)
        throws IllegalArgumentException {
        sentBytes = 0;
        messages = msgs;
        int totalLength = 0;
        for(int i=0;i<msgs.length;i++) {
            PeerPacketMessage m = msgs[i];
+           m.resolve(p);
            totalLength += m.getLength();
            if((i != (msgs.length-1)) && m.hasTrailer())
                throw new IllegalArgumentException("trailers can only be attached to 
the LAST message!");
@@ -39,6 +41,10 @@
        return encryptedData;
     }
     
+    public int countMessages() {
+       return messages.length;
+    }
+    
     /**
      * Notify clients of message send completion
      * @param finished whether the packet has finished sending. If false,
@@ -47,7 +53,7 @@
      * Can be zero. Bytes after this number might have been sent, but we got
      * an error and can't be sure.
      */
-    public void jobDone(boolean finished, int successfullySentBytes) {
+    public void jobDone(boolean finished, int successfullySentBytes, Peer sentTo) {
        int msgStartOffset = 0;
        // Could unfold this a bit by keeping the offset in the array on the object
        PeerPacketMessage prev = null;
@@ -71,19 +77,44 @@
                // Last message was not dealt with last time
                // prev != null because msgStartOffset > 0
                int prevLen = prev.getLength();
-               int diff = msgStartOffset - sentBytes;
-               if(finished || 
-                  ((successfullySentBytes - msgStartOffset) > prevLen))
-                   prev.notifyDone((successfullySentBytes - msgStartOffset) >
-                                   prevLen);
+               int diff = successfullySentBytes - msgStartOffset;
+               if(finished || diff >= prevLen) {
+                   if(diff >= prevLen) {
+                       prev.notifySuccess();
+                   } else {
+                       String excuse = "Sent "+
+                           (diff>=0 ? diff : 0) +
+                           " bytes ("+(getLength() - successfullySentBytes)+
+                           " of packet in notifyDone";
+                       SendFailedException sfe = 
+                           new SendFailedException(sentTo.getAddress(), 
+                                                   sentTo.getIdentity(),
+                                                   excuse, false);
+                       prev.notifyFailure(sfe);
+                   }
+               }
            }
            prev = messages[i];
        }
        if(prev != null) {
            int prevLen = prev.getLength();
            boolean success = (successfullySentBytes == encryptedData.length);
-           if(finished || success)
-               prev.notifyDone(success);
+           if(finished || success) {
+               if(success)
+                   prev.notifySuccess();
+               else {
+                   int diff = successfullySentBytes - msgStartOffset;
+                   String excuse = "Sent "+
+                       (diff>=0 ? diff : 0) +
+                           " bytes ("+(getLength() - successfullySentBytes)+
+                           " of packet in notifyDone";
+                       SendFailedException sfe = 
+                           new SendFailedException(sentTo.getAddress(), 
+                                                   sentTo.getIdentity(),
+                                                   excuse, false);
+                       prev.notifyFailure(sfe);
+               }
+           }
        }
     }
 }

Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PeerPacketMessage.java      8 Sep 2003 17:03:03 -0000       1.1
+++ PeerPacketMessage.java      4 Oct 2003 01:16:56 -0000       1.2
@@ -4,21 +4,56 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
+/**
+ * PeerPacketMessage - a message to be sent to a particular peer node.
+ * Unencrypted, not yet attached to any particular connection.
+ * Used in PeerHandler's message queue, and as a part of a PeerPacket.
+ */
 class PeerPacketMessage {
-    final RawMessage raw;
-    final byte[] content; // unencrypted - encrypt at PeerPacket level
+    final Message msg;
     final MessageSendCallback cb;
-    final Peer peer;
+    final Identity id;
     static ByteArrayOutputStream bais = new ByteArrayOutputStream();
+    boolean finished = false;
+    
+    final int priority;
+    
+    // These are created when we resolve(Presentation)
+    Presentation p = null;
+    RawMessage raw = null;
+    byte[] content = null; // unencrypted - encrypt at PeerPacket level
+    
     
     public String toString() {
-       return super.toString() + ":" + raw.toString() + ":" + cb;
+       return super.toString() + ":" + raw + ":" + cb;
     }
     
-    public PeerPacketMessage(Peer p, RawMessage raw, MessageSendCallback cb) {
-       this.peer = p;
-       this.raw = raw;
+    public PeerPacketMessage(Identity i, Message msg, MessageSendCallback cb,
+                            int priority) {
+       this.id = i;
+       this.msg = msg;
        this.cb = cb;
+       this.priority = priority;
+    }
+    
+    /** Set the message up to send on a connection using a specific
+     * Presentation.
+     * @param p the presentation to use to transform the message into
+     * a RawMessage and thence to a byte array. Can be null to clear the
+     * cached message.
+     */
+    // Can be called multiple times
+    public void resolve(Presentation p) {
+       if(this.p == p)
+           return;
+       this.p = p;
+       if(p == null) {
+           this.p = null;
+           this.raw = null;
+           this.content = null;
+           return;
+       }
+       this.raw = msg.toRawMessage(p);
        try {
            synchronized(bais) {
                bais.reset();
@@ -55,21 +90,43 @@
     }
     
     /**
-     * Notify the callback, after having sent (or failed to send) the message
+     * Notify the callback that we successfully sent the message.
      */
-    public void notifyDone(boolean success) {
+    public void notifySuccess() {
+       if(finished) {
+           Core.logger.log(this, "notifySuccess on "+this+" already finished!",
+                           Logger.ERROR);
+           return;
+       }
+       finished = true;
        if(cb == null) return;
        try {
-           if(success) {
-               cb.succeeded();
-           } else {
-               SendFailedException e =
-                   new SendFailedException(peer.getAddress(), false);
-               cb.thrown(e);
-           }
+           cb.succeeded();
        } catch (Throwable t) {
-           Core.logger.log(this, toString()+".notifyDone() caught "+t,
+           Core.logger.log(this, toString()+".notifySuccess() caught "+t,
                            t, Logger.ERROR);
+       }
+    }
+    
+    /**
+     * Notify the callback that we failed to send the message.
+     * @param detail the excuse exception
+     */
+    public void notifyFailure(SendFailedException detail) {
+       if(finished) {
+           Core.logger.log(this, "notifyFailure on "+this+" already finished!",
+                           Logger.ERROR);
+           return;
+       }
+       finished = true;
+       if(cb == null) return;
+       try {
+           cb.thrown(detail);
+       } catch (Throwable t) {
+           Core.logger.log(this, toString()+".notifyFailure("+detail+") caught "+
+                           t, t, Logger.ERROR);
+           Core.logger.log(this, toString()+".notifyFailure: detail was "+detail,
+                           detail, Logger.ERROR);
        }
     }
 }

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.409
retrieving revision 1.410
diff -u -r1.409 -r1.410
--- Version.java        3 Oct 2003 08:35:19 -0000       1.409
+++ Version.java        4 Oct 2003 01:16:56 -0000       1.410
@@ -18,7 +18,7 @@
     public static String protocolVersion = "1.46";
     
     /** The build number of the current revision */
-    public static final int buildNumber = 6216;
+    public static final int buildNumber = 6217;
     // 6028: may 3; ARK retrieval fix
 
     public static final int ignoreBuildsAfter = 6500;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to